Repository: tajo Updated Branches: refs/heads/master 22ab1cf97 -> 8c50410dc
TAJO-1810: Remove QueryMasterTask cache immediately, if it stored to persistent storage. Closes #721 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8c50410d Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8c50410d Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8c50410d Branch: refs/heads/master Commit: 8c50410dcf370290982719f8add9aad814a5d4e0 Parents: 22ab1cf Author: Jinho Kim <[email protected]> Authored: Thu Sep 3 21:30:17 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Thu Sep 3 21:30:17 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/QueryTestCaseBase.java | 12 +++ .../org/apache/tajo/TajoTestingCluster.java | 18 +++- .../java/org/apache/tajo/conf/TajoConf.java | 5 +- .../tajo/engine/query/TestGroupByQuery.java | 49 +++------- .../tajo/engine/query/TestTablePartitions.java | 51 ++++++---- .../org/apache/tajo/master/TestQueryResult.java | 14 +-- .../apache/tajo/querymaster/TestQueryState.java | 22 +++-- .../tajo/querymaster/TestTaskStatusUpdate.java | 68 +++++--------- .../org/apache/tajo/master/QueryManager.java | 9 +- .../apache/tajo/querymaster/QueryMaster.java | 97 ++++++-------------- .../java/org/apache/tajo/querymaster/Stage.java | 11 ++- .../apache/tajo/util/history/HistoryWriter.java | 11 ++- .../tajo/worker/TajoWorkerClientService.java | 10 +- 14 files changed, 161 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b47f20d..350ba1e 100644 --- a/CHANGES +++ b/CHANGES @@ -34,6 +34,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1810: Remove QueryMasterTask cache immediately, if it stored to + persistent storage. (jinho) + TAJO-993: Cleanup the result data in HDFS after query finished. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 0f277f0..bfafd6d 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -41,6 +41,8 @@ import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.UndefinedTableException; +import org.apache.tajo.jdbc.FetchResultSet; +import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.master.GlobalEngine; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; @@ -1127,4 +1129,14 @@ public class QueryTestCaseBase { } return result; } + + public static QueryId getQueryId(ResultSet resultSet) { + if (resultSet instanceof TajoMemoryResultSet) { + return ((TajoMemoryResultSet) resultSet).getQueryId(); + } else if (resultSet instanceof FetchResultSet) { + return ((FetchResultSet) resultSet).getQueryId(); + } else { + throw new IllegalArgumentException(resultSet.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index ab1c156..bb690dd 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -31,10 +31,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.apache.tajo.annotation.NotNull; -import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.store.*; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.client.TajoClientUtil; @@ -53,8 +50,8 @@ import org.apache.tajo.storage.FileTablespace; import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.worker.TajoWorker; import java.io.File; @@ -156,7 +153,7 @@ public class TajoTestingCluster { conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); // Memory cache termination - conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1); + conf.setIntVar(ConfVars.HISTORY_QUERY_CACHE_SIZE, 10); // Python function path conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString()); @@ -772,4 +769,15 @@ public class TajoTestingCluster { } return qmt; } + + public QueryHistory getQueryHistory(QueryId queryId) throws IOException { + QueryHistory queryHistory = null; + for (TajoWorker worker : getTajoWorkers()) { + queryHistory = worker.getWorkerContext().getQueryMaster().getQueryHistory(queryId); + if (queryHistory != null) { + break; + } + } + return queryHistory; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index b50ce81..2a30995 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -184,10 +184,6 @@ public class TajoConf extends Configuration { WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()), - // Tajo History - WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours - QUERYMASTER_CACHE_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 10), // 10 mins - WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE("tajo.worker.heartbeat.queue.threshold-rate", 0.3f, Validators.min("0")),//30% WORKER_HEARTBEAT_IDLE_INTERVAL("tajo.worker.heartbeat.idle.interval", 10 * 1000), // 10 sec WORKER_HEARTBEAT_ACTIVE_INTERVAL("tajo.worker.heartbeat.active.interval", 1000), // 1 sec @@ -276,6 +272,7 @@ public class TajoConf extends Configuration { HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"), HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"), HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7), + HISTORY_QUERY_CACHE_SIZE("tajo.history.cache.size", 100, Validators.min("0")), // Misc ------------------------------------------------------------------- // Fragment http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index a5caf38..0fe5e0e 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -24,14 +24,11 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.querymaster.Query; -import org.apache.tajo.querymaster.QueryMasterTask; -import org.apache.tajo.querymaster.Stage; -import org.apache.tajo.querymaster.Task; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; -import org.apache.tajo.worker.TajoWorker; +import org.apache.tajo.util.history.QueryHistory; +import org.apache.tajo.util.history.StageHistory; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -42,7 +39,7 @@ import org.junit.runners.Parameterized.Parameters; import java.sql.ResultSet; import java.util.*; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; @Category(IntegrationTest.class) @RunWith(Parameterized.class) @@ -693,8 +690,6 @@ public class TestGroupByQuery extends QueryTestCaseBase { @Test public final void testNumShufflePartition() throws Exception { - - Thread.sleep(5000); KeyValueSet tableOptions = new KeyValueSet(); tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); @@ -740,40 +735,18 @@ public class TestGroupByQuery extends QueryTestCaseBase { } assertEquals(uniqKeys.size(), numRows); - // find last QueryMasterTask - List<QueryMasterTask> qmTasks = new ArrayList<QueryMasterTask>(); - - for(TajoWorker worker: testingCluster.getTajoWorkers()) { - qmTasks.addAll(worker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks()); - } + QueryId queryId = getQueryId(res); + QueryHistory queryHistory = testingCluster.getQueryHistory(queryId); - assertTrue(!qmTasks.isEmpty()); - - Collections.sort(qmTasks, new Comparator<QueryMasterTask>() { - @Override - public int compare(QueryMasterTask o1, QueryMasterTask o2) { - long l1 = o1.getQuerySubmitTime(); - long l2 = o2.getQuerySubmitTime(); - return l1 < l2 ? - 1 : (l1 > l2 ? 1 : 0); - } - }); - - // Getting the number of partitions. It should be 2. - Set<Integer> partitionIds = new HashSet<Integer>(); - - Query query = qmTasks.get(qmTasks.size() - 1).getQuery(); - Collection<Stage> stages = query.getStages(); - assertNotNull(stages); - assertTrue(!stages.isEmpty()); - for (Stage stage : stages) { - if (stage.getId().toStringNoPrefix().endsWith("_000001")) { - for (Task.IntermediateEntry eachInterm: stage.getHashShuffleIntermediateEntries()) { - partitionIds.add(eachInterm.getPartId()); - } + int shuffles = 0; + for (StageHistory stage : queryHistory.getStageHistories()) { + if (stage.getExecutionBlockId().endsWith("_000001")) { + // Getting the number of partitions. It should be 2. + shuffles = stage.getNumShuffles(); } } - assertEquals(2, partitionIds.size()); + assertEquals(2, shuffles); executeString("DROP TABLE testnumshufflepartition PURGE").close(); } finally { testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname, http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 952e26a..52e7b54 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -28,6 +28,7 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.common.TajoDataTypes; @@ -77,9 +78,9 @@ public class TestTablePartitions extends QueryTestCaseBase { @Test public final void testCreateColumnPartitionedTable() throws Exception { - ResultSet res = null; + ResultSet res; String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTable"); - + ClientProtos.SubmitQueryResponse response; if (nodeType == NodeType.INSERT) { res = executeString( "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); @@ -89,16 +90,22 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); - res = testBase.execute( - "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + - "l_quantity from lineitem"); + response = client.executeQuery( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + + "l_quantity from lineitem"); } else { - res = testBase.execute( - "create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8) " - + " as select l_orderkey, l_partkey, l_quantity from lineitem"); + response = client.executeQuery( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from lineitem"); } - MasterPlan plan = getQueryPlan(res); + QueryId queryId = new QueryId(response.getQueryId()); + testingCluster.waitForQuerySubmitted(queryId, 10); + QueryMasterTask queryMasterTask = testingCluster.getQueryMasterTask(queryId); + assertNotNull(queryMasterTask); + TajoClientUtil.waitCompletion(client, queryId); + + MasterPlan plan = queryMasterTask.getQuery().getPlan(); ExecutionBlock rootEB = plan.getRoot(); assertEquals(1, plan.getChildCount(rootEB.getId())); @@ -120,15 +127,15 @@ public class TestTablePartitions extends QueryTestCaseBase { TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, - tableDesc.getStats().getNumRows()); + tableDesc.getStats().getNumRows()); executeString("DROP TABLE " + tableName + " PURGE").close(); - res.close(); } @Test public final void testCreateColumnPartitionedTableWithJoin() throws Exception { - ResultSet res = null; + ResultSet res; + ClientProtos.SubmitQueryResponse response; String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin"); if (nodeType == NodeType.INSERT) { @@ -140,18 +147,23 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); - res = testBase.execute( - "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + - "l_quantity from lineitem join orders on l_orderkey = o_orderkey"); + response = client.executeQuery( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + + "l_quantity from lineitem join orders on l_orderkey = o_orderkey"); } else { - res = testBase.execute("create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) " - + " AS select l_orderkey, l_partkey, l_quantity from lineitem join orders on l_orderkey = o_orderkey"); + response = client.executeQuery("create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) " + + " AS select l_orderkey, l_partkey, l_quantity from lineitem join orders on l_orderkey = o_orderkey"); } - MasterPlan plan = getQueryPlan(res); - ExecutionBlock rootEB = plan.getRoot(); + QueryId queryId = new QueryId(response.getQueryId()); + testingCluster.waitForQuerySubmitted(queryId, 10); + QueryMasterTask queryMasterTask = testingCluster.getQueryMasterTask(queryId); + assertNotNull(queryMasterTask); + TajoClientUtil.waitCompletion(client, queryId); + MasterPlan plan = queryMasterTask.getQuery().getPlan(); + ExecutionBlock rootEB = plan.getRoot(); assertEquals(1, plan.getChildCount(rootEB.getId())); ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0); @@ -173,7 +185,6 @@ public class TestTablePartitions extends QueryTestCaseBase { tableDesc.getStats().getNumRows()); executeString("DROP TABLE " + tableName + " PURGE").close(); - res.close(); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java index 6775c84..ef25e86 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java @@ -18,13 +18,11 @@ package org.apache.tajo.master; +import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryId; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.jdbc.FetchResultSet; -import org.apache.tajo.jdbc.TajoMemoryResultSet; -import org.apache.hadoop.fs.Path; import org.junit.Test; import java.sql.ResultSet; @@ -61,14 +59,4 @@ public class TestQueryResult extends QueryTestCaseBase { assertResultSet(res); cleanupQuery(res); } - - private QueryId getQueryId(ResultSet resultSet) { - if (resultSet instanceof TajoMemoryResultSet) { - return ((TajoMemoryResultSet) resultSet).getQueryId(); - } else if (resultSet instanceof FetchResultSet) { - return ((FetchResultSet) resultSet).getQueryId(); - } else { - throw new IllegalArgumentException(resultSet.toString()); - } - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java index 978d709..a43491b 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java @@ -24,11 +24,16 @@ import org.apache.tajo.client.QueryStatus; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.util.history.QueryHistory; +import org.apache.tajo.util.history.StageHistory; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.List; + import static org.junit.Assert.*; @Category(IntegrationTest.class) @@ -80,17 +85,16 @@ public class TestQueryState { queryState = client.getQueryStatus(queryId); } - QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); - Query query = qmt.getQuery(); + QueryInfo queryInfo = cluster.getMaster().getContext().getQueryJobManager().getFinishedQuery(queryId); + assertEquals(queryId, queryInfo.getQueryId()); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, queryInfo.getQueryState()); - assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState()); - assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getSynchronizedState()); - assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState()); + QueryHistory history = cluster.getQueryHistory(queryId); + List<StageHistory> stages = history.getStageHistories(); - assertFalse(query.getStages().isEmpty()); - for (Stage stage : query.getStages()) { - assertEquals(StageState.SUCCEEDED, stage.getSynchronizedState()); - assertEquals(StageState.SUCCEEDED, stage.getState()); + assertFalse(stages.isEmpty()); + for (StageHistory stage : stages) { + assertEquals(StageState.SUCCEEDED.toString(), stage.getState()); } /* get status from TajoMaster */ http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java index b468e37..4654d38 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java @@ -19,18 +19,21 @@ package org.apache.tajo.querymaster; import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryId; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.worker.TajoWorker; +import org.apache.tajo.util.history.QueryHistory; +import org.apache.tajo.util.history.StageHistory; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import java.sql.ResultSet; -import java.util.*; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; @@ -58,8 +61,9 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { long[] expectedNumRows = new long[]{5, 2, 2, 2}; long[] expectedNumBytes = new long[]{604, 18, 18, 8}; long[] expectedReadBytes = new long[]{604, 604, 18, 0}; + QueryId queryId = getQueryId(res); - assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); + assertStatus(queryId, 2, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { cleanupQuery(res); } @@ -77,7 +81,8 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194}; long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0}; - assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes); + QueryId queryId = getQueryId(res); + assertStatus(queryId, 3, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { cleanupQuery(res); } @@ -105,7 +110,8 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 18}; long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0}; - assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes); + QueryId queryId = getQueryId(res); + assertStatus(queryId, 4, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { cleanupQuery(res); } @@ -128,61 +134,37 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { res.close(); } - private void assertStatus(int numStages, + private void assertStatus(QueryId queryId, int numStages, long[] expectedNumRows, long[] expectedNumBytes, long[] expectedReadBytes) throws Exception { - List<TajoWorker> tajoWorkers = testingCluster.getTajoWorkers(); - Collection<QueryMasterTask> finishedTasks = null; - for (TajoWorker eachWorker: tajoWorkers) { - finishedTasks = eachWorker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks(); - if (finishedTasks != null && !finishedTasks.isEmpty()) { - break; - } - } - - assertNotNull(finishedTasks); - assertTrue(!finishedTasks.isEmpty()); - - List<QueryMasterTask> finishedTaskList = new ArrayList<QueryMasterTask>(finishedTasks); - Collections.sort(finishedTaskList, new Comparator<QueryMasterTask>() { - @Override - public int compare(QueryMasterTask o1, QueryMasterTask o2) { - return o2.getQueryId().compareTo(o1.getQueryId()); - } - }); - Query query = finishedTaskList.get(0).getQuery(); + QueryHistory queryHistory = testingCluster.getQueryHistory(queryId); - assertNotNull(query); + assertNotNull(queryHistory); - List<Stage> stages = new ArrayList<Stage>(query.getStages()); + List<StageHistory> stages = queryHistory.getStageHistories(); assertEquals(numStages, stages.size()); - Collections.sort(stages, new Comparator<Stage>() { + Collections.sort(stages, new Comparator<StageHistory>() { @Override - public int compare(Stage o1, Stage o2) { - return o1.getId().compareTo(o2.getId()); + public int compare(StageHistory o1, StageHistory o2) { + return o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId()); } }); int index = 0; - for (Stage eachStage : stages) { - TableStats inputStats = eachStage.getInputStats(); - TableStats resultStats = eachStage.getResultStats(); + for (StageHistory eachStage : stages) { - assertNotNull(inputStats); - assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue()); - assertEquals(expectedNumBytes[index], inputStats.getNumBytes().longValue()); - assertEquals(expectedReadBytes[index], inputStats.getReadBytes().longValue()); + assertEquals(expectedNumRows[index], eachStage.getTotalReadRows()); + assertEquals(expectedNumBytes[index], eachStage.getTotalInputBytes()); + assertEquals(expectedReadBytes[index], eachStage.getTotalReadBytes()); index++; - assertNotNull(resultStats); - assertEquals(expectedNumRows[index], resultStats.getNumRows().longValue()); - assertEquals(expectedNumBytes[index], resultStats.getNumBytes().longValue()); - assertEquals(expectedReadBytes[index], resultStats.getReadBytes().longValue()); + assertEquals(expectedNumRows[index], eachStage.getTotalWriteRows()); + assertEquals(expectedNumBytes[index],eachStage.getTotalWriteBytes()); index++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 95562ed..ba421bd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -35,13 +35,14 @@ import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest; import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.scheduler.QuerySchedulingInfo; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; -import org.apache.tajo.util.history.HistoryReader; +import org.apache.tajo.util.TUtil; import java.io.IOException; import java.util.*; @@ -61,9 +62,9 @@ public class QueryManager extends CompositeService { private AsyncDispatcher dispatcher; private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap(); - private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap(); - private final LRUMap historyCache = new LRUMap(HistoryReader.DEFAULT_PAGE_SIZE); + + private LRUMap historyCache; private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE); private AtomicLong maxExecutionTime = new AtomicLong(); @@ -83,6 +84,8 @@ public class QueryManager extends CompositeService { this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler()); + TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + this.historyCache = new LRUMap(tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_CACHE_SIZE)); } catch (Exception e) { LOG.error("Failed to init service " + getName() + " by exception " + e, e); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index a029802..cce9482 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -30,29 +30,28 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tajo.QueryId; - +import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest; +import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse; +import org.apache.tajo.ResourceProtos.WorkerConnectionsResponse; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; -import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest; -import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse; -import org.apache.tajo.ResourceProtos.WorkerConnectionsResponse; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.TUtil; -import org.apache.tajo.util.history.HistoryReader; +import org.apache.tajo.util.history.HistoryWriter.WriterFuture; +import org.apache.tajo.util.history.HistoryWriter.WriterHolder; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.worker.TajoWorker; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -76,8 +75,7 @@ public class QueryMaster extends CompositeService implements EventHandler { private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap(); - private final LRUMap - finishedQueryMasterTasksCache = new LRUMap(HistoryReader.DEFAULT_PAGE_SIZE); + private LRUMap finishedQueryMasterTasksCache; private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread; @@ -89,8 +87,6 @@ public class QueryMaster extends CompositeService implements EventHandler { private QueryHeartbeatThread queryHeartbeatThread; - private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread; - private TajoWorker.WorkerContext workerContext; private RpcClientManager manager; @@ -114,6 +110,7 @@ public class QueryMaster extends CompositeService implements EventHandler { queryMasterContext = new QueryMasterContext(systemConf); clock = new SystemClock(); + finishedQueryMasterTasksCache = new LRUMap(systemConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_CACHE_SIZE)); this.dispatcher = new AsyncDispatcher(); addIfService(dispatcher); @@ -134,9 +131,6 @@ public class QueryMaster extends CompositeService implements EventHandler { clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread(); clientSessionTimeoutCheckThread.start(); - finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread(); - finishedQueryMasterTaskCleanThread.start(); - eventExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); singleEventExecutor = Executors.newSingleThreadExecutor(); super.serviceStart(); @@ -155,10 +149,6 @@ public class QueryMaster extends CompositeService implements EventHandler { clientSessionTimeoutCheckThread.interrupt(); } - if(finishedQueryMasterTaskCleanThread != null) { - finishedQueryMasterTaskCleanThread.interrupt(); - } - if(eventExecutor != null){ eventExecutor.shutdown(); } @@ -210,7 +200,6 @@ public class QueryMaster extends CompositeService implements EventHandler { return queryMasterTasks.get(queryId); } - @Deprecated public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) { QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); if (queryMasterTask != null) { @@ -234,10 +223,12 @@ public class QueryMaster extends CompositeService implements EventHandler { return queryMasterTasks.values(); } - @Deprecated - public Collection<QueryMasterTask> getFinishedQueryMasterTasks() { - synchronized (finishedQueryMasterTasksCache) { - return new ArrayList<QueryMasterTask>(finishedQueryMasterTasksCache.values()); + public QueryHistory getQueryHistory(QueryId queryId) throws IOException { + QueryMasterTask queryMasterTask = getQueryMasterTask(queryId, true); + if(queryMasterTask != null) { + return queryMasterTask.getQuery().getQueryHistory(); + } else { + return workerContext.getHistoryReader().getQueryHistory(queryId.toString()); } } @@ -284,7 +275,7 @@ public class QueryMaster extends CompositeService implements EventHandler { return dispatcher.getEventHandler(); } - public void stopQuery(QueryId queryId) { + public void stopQuery(final QueryId queryId) { QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); if(queryMasterTask == null) { LOG.warn("No query info:" + queryId); @@ -325,8 +316,19 @@ public class QueryMaster extends CompositeService implements EventHandler { QueryHistory queryHisory = query.getQueryHistory(); if (queryHisory != null) { try { + WriterFuture<WriterHolder> writerFuture = new WriterFuture<WriterHolder>(queryHisory) { + @Override + public void done(WriterHolder writerHolder) { + super.done(writerHolder); + + //remove memory cache, if history file writer is done + synchronized (finishedQueryMasterTasksCache) { + finishedQueryMasterTasksCache.remove(queryId); + } + } + }; query.context.getQueryMasterContext().getWorkerContext(). - getTaskHistoryWriter().appendAndFlush(queryHisory); + getTaskHistoryWriter().appendHistory(writerFuture); } catch (Throwable e) { LOG.warn(e, e); } @@ -455,49 +457,4 @@ public class QueryMaster extends CompositeService implements EventHandler { } } } - - class FinishedQueryMasterTaskCleanThread extends Thread { - public void run() { - int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_CACHE_EXPIRE_PERIOD); - LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime); - while(!isStopped) { - try { - synchronized (this) { - this.wait(60 * 1000); // minimum interval minutes - } - } catch (InterruptedException e) { - break; - } - try { - long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000l; - cleanExpiredFinishedQueryMasterTask(expireTime); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - } - - private void cleanExpiredFinishedQueryMasterTask(long expireTime) { - List<Object> finishedList; - synchronized (finishedQueryMasterTasksCache) { - finishedList = new ArrayList<Object>(finishedQueryMasterTasksCache.values()); - } - - for(Object finishedTask: finishedList) { - QueryMasterTask queryMasterTask = (QueryMasterTask) finishedTask; - /* If a query are abnormal termination, the finished time will be zero. */ - long finishedTime = queryMasterTask.getStartTime(); - Query query = queryMasterTask.getQuery(); - if (query != null && query.getFinishTime() > 0) { - finishedTime = query.getFinishTime(); - } - - if(finishedTime < expireTime) { - synchronized (finishedQueryMasterTasksCache) { - finishedQueryMasterTasksCache.remove(queryMasterTask.getQueryId()); - } - } - } - } - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 12e4366..2ded786 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -21,6 +21,7 @@ package org.apache.tajo.querymaster; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.event.Event; @@ -458,9 +459,8 @@ public class Stage implements EventHandler<StageEvent> { long totalReadRows = 0; long totalWriteBytes = 0; long totalWriteRows = 0; - int numShuffles = 0; + for(Task eachTask : getTasks()) { - numShuffles = eachTask.getShuffleOutpuNum(); if (eachTask.getLastAttempt() != null) { TableStats inputStats = eachTask.getLastAttempt().getInputStats(); if (inputStats != null) { @@ -476,12 +476,17 @@ public class Stage implements EventHandler<StageEvent> { } } + Set<Integer> partitions = Sets.newHashSet(); + for (IntermediateEntry entry : getHashShuffleIntermediateEntries()) { + partitions.add(entry.getPartId()); + } + stageHistory.setTotalInputBytes(totalInputBytes); stageHistory.setTotalReadBytes(totalReadBytes); stageHistory.setTotalReadRows(totalReadRows); stageHistory.setTotalWriteBytes(totalWriteBytes); stageHistory.setTotalWriteRows(totalWriteRows); - stageHistory.setNumShuffles(numShuffles); + stageHistory.setNumShuffles(partitions.size()); stageHistory.setProgress(getProgress()); return stageHistory; } http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index c51b4e6..5fca7a7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -136,6 +136,13 @@ public class HistoryWriter extends AbstractService { return future; } + public void appendHistory(WriterFuture<WriterHolder> future) { + historyQueue.add(future); + synchronized (writerThread) { + writerThread.notifyAll(); + } + } + /* asynchronously flush to history file */ public WriterFuture<WriterHolder> appendAndFlush(History history) { WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history) { @@ -524,7 +531,7 @@ public class HistoryWriter extends AbstractService { return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX); } - static class WriterHolder implements Closeable { + public static class WriterHolder implements Closeable { long lastWritingTime; Path path; FSDataOutputStream out; @@ -542,7 +549,7 @@ public class HistoryWriter extends AbstractService { } } - static class WriterFuture<T> implements Future<T> { + public static class WriterFuture<T> implements Future<T> { private boolean done = false; private T result; private History history; http://git-wip-us.apache.org/repos/asf/tajo/blob/8c50410d/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 281e23e..26b0d08 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -31,7 +31,6 @@ import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; import org.apache.tajo.ipc.QueryMasterClientProtocol; -import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; @@ -117,14 +116,7 @@ public class TajoWorkerClientService extends AbstractService { try { QueryId queryId = new QueryId(request.getQueryId()); - - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); - QueryHistory queryHistory = null; - if (queryMasterTask == null) { - queryHistory = workerContext.getHistoryReader().getQueryHistory(queryId.toString()); - } else { - queryHistory = queryMasterTask.getQuery().getQueryHistory(); - } + QueryHistory queryHistory = workerContext.getQueryMaster().getQueryHistory(queryId); if (queryHistory != null) { builder.setQueryHistory(queryHistory.getProto());
