TAJO-1205: Remove possible memory leak in TajoMaster. (jinho) Closes #265
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/80afe993 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/80afe993 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/80afe993 Branch: refs/heads/hbase_storage Commit: 80afe993b82d57582fbeab64d20199f4dfa3d9af Parents: 965cbd9 Author: jhkim <[email protected]> Authored: Fri Nov 21 16:23:32 2014 +0900 Committer: jhkim <[email protected]> Committed: Fri Nov 21 16:23:32 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/master/TajoMasterClientService.java | 32 ++++----- .../master/querymaster/QueryJobManager.java | 70 ++++++++++++++++---- .../master/querymaster/QueryMasterTask.java | 1 + .../tajo/webapp/QueryExecutorServlet.java | 24 +++++-- .../src/main/resources/webapps/admin/index.jsp | 42 ++---------- .../resources/webapps/admin/query_executor.jsp | 4 +- .../org/apache/tajo/worker/TestHistory.java | 6 +- 8 files changed, 107 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 42ab10a..d5c7f1c 100644 --- a/CHANGES +++ b/CHANGES @@ -63,6 +63,8 @@ Release 0.9.1 - unreleased BUG FIXES + TAJO-1205: Remove possible memory leak in TajoMaster. (jinho) + TAJO-1181: Avro schema URL should support various protocols. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 2c81cd0..540bd71 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; -import org.apache.tajo.*; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.exception.NoSuchDatabaseException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -57,8 +60,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.history.QueryHistory; import java.io.IOException; import java.net.InetSocketAddress; @@ -319,22 +320,23 @@ public class TajoMasterClientService extends AbstractService { // if we cannot get a QueryInProgress instance from QueryJobManager, // the instance can be in the finished query list. + QueryInfo queryInfo = null; if (queryInProgress == null) { - queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId); + queryInfo = context.getQueryJobManager().getFinishedQuery(queryId); + } else { + queryInfo = queryInProgress.getQueryInfo(); } GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder(); - // If we cannot the QueryInProgress instance from the finished list, + // If we cannot the QueryInfo instance from the finished list, // the query result was expired due to timeout. // In this case, we will result in error. - if (queryInProgress == null) { + if (queryInfo == null) { builder.setErrorMessage("No such query: " + queryId.toString()); return builder.build(); } - QueryInfo queryInfo = queryInProgress.getQueryInfo(); - try { //TODO After implementation Tajo's user security feature, Should be modified. builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); @@ -404,14 +406,12 @@ public class TajoMasterClientService extends AbstractService { context.getSessionManager().touch(request.getSessionId().getId()); GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder(); - Collection<QueryInProgress> queries + Collection<QueryInfo> queries = context.getQueryJobManager().getFinishedQueries(); BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder(); - for (QueryInProgress queryInProgress : queries) { - QueryInfo queryInfo = queryInProgress.getQueryInfo(); - + for (QueryInfo queryInfo : queries) { infoBuilder.setQueryId(queryInfo.getQueryId().getProto()); infoBuilder.setState(queryInfo.getQueryState()); infoBuilder.setQuery(queryInfo.getSql()); @@ -452,12 +452,14 @@ public class TajoMasterClientService extends AbstractService { QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId); // It will try to find a query status from a finished query list. + QueryInfo queryInfo = null; if (queryInProgress == null) { - queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId); + queryInfo = context.getQueryJobManager().getFinishedQuery(queryId); + } else { + queryInfo = queryInProgress.getQueryInfo(); } - if (queryInProgress != null) { - QueryInfo queryInfo = queryInProgress.getQueryInfo(); + if (queryInfo != null) { builder.setResultCode(ResultCode.OK); builder.setState(queryInfo.getQueryState()); builder.setProgress(queryInfo.getProgress()); http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index bcca039..536f6ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -18,6 +18,7 @@ package org.apache.tajo.master.querymaster; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,18 +28,19 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; -import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.scheduler.SimpleFifoScheduler; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class QueryJobManager extends CompositeService { private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName()); @@ -54,7 +56,10 @@ public class QueryJobManager extends CompositeService { private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>(); - private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>(); + private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE); + private AtomicLong maxExecutionTime = new AtomicLong(); + private AtomicLong avgExecutionTime = new AtomicLong(); + private AtomicLong executedQuerySize = new AtomicLong(); public QueryJobManager(final TajoMaster.MasterContext masterContext) { super(QueryJobManager.class.getName()); @@ -110,9 +115,22 @@ public class QueryJobManager extends CompositeService { } } - public Collection<QueryInProgress> getFinishedQueries() { - synchronized (finishedQueries){ - return Collections.unmodifiableCollection(finishedQueries.values()); + public synchronized Collection<QueryInfo> getFinishedQueries() { + try { + return this.masterContext.getHistoryReader().getQueries(null); + } catch (Throwable e) { + LOG.error(e); + return Lists.newArrayList(); + } + } + + + public synchronized QueryInfo getFinishedQuery(QueryId queryId) { + try { + return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); + } catch (Throwable e) { + LOG.error(e); + return null; } } @@ -194,12 +212,6 @@ public class QueryJobManager extends CompositeService { return queryInProgress; } - public QueryInProgress getFinishedQuery(QueryId queryId) { - synchronized(finishedQueries) { - return finishedQueries.get(queryId); - } - } - public void stopQuery(QueryId queryId) { LOG.info("Stop QueryInProgress:" + queryId); QueryInProgress queryInProgress = getQueryInProgress(queryId); @@ -213,14 +225,46 @@ public class QueryJobManager extends CompositeService { runningQueries.remove(queryId); } - synchronized(finishedQueries) { - finishedQueries.put(queryId, queryInProgress); + QueryInfo queryInfo = queryInProgress.getQueryInfo(); + long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime(); + if (executionTime < minExecutionTime.get()) { + minExecutionTime.set(executionTime); } + + if (executionTime > maxExecutionTime.get()) { + maxExecutionTime.set(executionTime); + } + + long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get(); + if (totalExecutionTime > 0) { + avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1)); + } else { + avgExecutionTime.set(executionTime); + } + executedQuerySize.incrementAndGet(); + removeService(queryInProgress); } else { LOG.warn("No QueryInProgress while query stopping: " + queryId); } } + public long getMinExecutionTime() { + if (getExecutedQuerySize() == 0) return 0; + return minExecutionTime.get(); + } + + public long getMaxExecutionTime() { + return maxExecutionTime.get(); + } + + public long getAvgExecutionTime() { + return avgExecutionTime.get(); + } + + public long getExecutedQuerySize() { + return executedQuerySize.get(); + } + private void catchException(QueryId queryId, Exception e) { LOG.error(e.getMessage(), e); QueryInProgress queryInProgress = runningQueries.get(queryId); http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 8ba9600..5cf3df5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -354,6 +354,7 @@ public class QueryMasterTask extends CompositeService { LogicalPlanner planner = new LogicalPlanner(catalog); LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); + jsonExpr = null; // remove the possible OOM LogicalPlan plan = planner.createPlan(queryContext, expr); optimizer.optimize(queryContext, plan); http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index 23311ac..0075b04 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -29,10 +29,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +59,7 @@ public class QueryExecutorServlet extends HttpServlet { ObjectMapper om = new ObjectMapper(); //queryRunnerId -> QueryRunner + //TODO We must handle the session. private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>(); private TajoConf tajoConf; @@ -100,11 +98,29 @@ public class QueryExecutorServlet extends HttpServlet { } if("runQuery".equals(action)) { + String prevQueryRunnerId = request.getParameter("prevQueryId"); + if (prevQueryRunnerId != null) { + synchronized (queryRunners) { + QueryRunner runner = queryRunners.remove(prevQueryRunnerId); + if (runner != null) runner.setStop(); + } + } + + float allowedMemoryRatio = 0.5f; // if TajoMaster memory usage is over 50%, the request will be canceled + long maxMemory = Runtime.getRuntime().maxMemory(); + long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + if(usedMemory > maxMemory * allowedMemoryRatio) { + errorResponse(response, "Allowed memory size of " + + (maxMemory * allowedMemoryRatio) / (1024 * 1024) + " MB exhausted"); + return; + } + String query = request.getParameter("query"); if(query == null || query.trim().isEmpty()) { errorResponse(response, "No query parameter"); return; } + String queryRunnerId = null; while(true) { synchronized(queryRunners) { http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index ce4d7dc..30cbf88 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -85,38 +85,6 @@ String numDeadWorkersHtml = numDeadWorkers == 0 ? "0" : "<font color='red'>" + numDeadWorkers + "</font>"; String numDeadQueryMastersHtml = numDeadQueryMasters == 0 ? "0" : "<font color='red'>" + numDeadQueryMasters + "</font>"; - Collection<QueryInProgress> runningQueries = master.getContext().getQueryJobManager().getRunningQueries(); - Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); - - int avgQueryTime = 0; - int minQueryTime = Integer.MAX_VALUE; - int maxQueryTime = 0; - - long totalTime = 0; - for(QueryInProgress eachQuery: finishedQueries) { - int runTime = (int)(eachQuery.getQueryInfo().getFinishTime() == 0 ? -1 : - eachQuery.getQueryInfo().getFinishTime() - eachQuery.getQueryInfo().getStartTime()); - if(runTime > 0) { - totalTime += runTime; - - if(runTime < minQueryTime) { - minQueryTime = runTime; - } - - if(runTime > maxQueryTime) { - maxQueryTime = runTime; - } - } - } - - if(minQueryTime == Integer.MAX_VALUE) { - minQueryTime = 0; - } - if(finishedQueries.size() > 0) { - avgQueryTime = (int)(totalTime / (long)finishedQueries.size()); - } - - HAService haService = master.getContext().getHAService(); List<TajoMasterInfo> masters = TUtil.newList(); @@ -217,11 +185,11 @@ <table width="100%" class="border_table" border="1"> <tr><th>Running Queries</th><th>Finished Queries</th><th>Average Execution Time</th><th>Min. Execution Time</th><th>Max. Execution Time</th></tr> <tr> - <td align='right'><%=runningQueries.size()%></td> - <td align='right'><%=finishedQueries.size()%></td> - <td align='left'><%=avgQueryTime/1000%> sec</td> - <td align='left'><%=minQueryTime/1000%> sec</td> - <td align='left'><%=maxQueryTime/1000%> sec</td> + <td align='right'><%=master.getContext().getQueryJobManager().getRunningQueries().size()%></td> + <td align='right'><%=master.getContext().getQueryJobManager().getExecutedQuerySize() %></td> + <td align='left'><%=master.getContext().getQueryJobManager().getAvgExecutionTime()/1000%> sec</td> + <td align='left'><%=master.getContext().getQueryJobManager().getMinExecutionTime()/1000%> sec</td> + <td align='left'><%=master.getContext().getQueryJobManager().getMaxExecutionTime()/1000%> sec</td> </tr> </table> </div> http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/main/resources/webapps/admin/query_executor.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp index c46fcb1..bbd1820 100644 --- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp @@ -96,7 +96,7 @@ function runQuery() { $.ajax({ type: "POST", url: "query_exec", - data: { action: "runQuery", query: query, limitSize:SIZE_LIMIT, database: sbox.options[sbox.selectedIndex].text } + data: { action: "runQuery", query: query, prevQueryId: queryRunnerId, limitSize:SIZE_LIMIT, database: sbox.options[sbox.selectedIndex].text } }) .done(function(msg) { var resultJson = $.parseJSON(msg); @@ -329,7 +329,7 @@ function getPage() { <hr/> <div id="queryResultTools"></div> <hr/> - <div style="dispaly:none;"><form name="dataForm" id="dataForm" method="post" action="getCSV.jsp"><input type="hidden" id="csvData" name="csvData" value="" /></div> + <div style="display:none;"><form name="dataForm" id="dataForm" method="post" action="getCSV.jsp"><input type="hidden" id="csvData" name="csvData" value="" /></div> </div> </body> </html> http://git-wip-us.apache.org/repos/asf/tajo/blob/80afe993/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java index c68d3a4..fa90b61 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -28,7 +28,7 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.querymaster.QueryInProgress; +import org.apache.tajo.master.querymaster.QueryInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -64,7 +64,7 @@ public class TestHistory { int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); client.executeQueryAndGetResult("select sleep(1) from lineitem"); - Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); + Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); assertTrue(finishedQueries.size() > beforeFinishedQueriesCount); TajoWorker worker = cluster.getTajoWorkers().get(0); @@ -91,7 +91,7 @@ public class TestHistory { int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); client.executeQueryAndGetResult("select sleep(1) from lineitem"); - Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); + Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); assertTrue(finishedQueries.size() > beforeFinishedQueriesCount); TajoWorker worker = cluster.getTajoWorkers().get(0);
