Repository: tajo Updated Branches: refs/heads/master 9880f06fd -> 1f6b5b387
TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim via hyunsik) Closes #974 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f6b5b38 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f6b5b38 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f6b5b38 Branch: refs/heads/master Commit: 1f6b5b38752f499ee6d70ea1be399df34442b4f3 Parents: 9880f06 Author: Hyunsik Choi <[email protected]> Authored: Mon Jul 28 11:19:54 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Jul 28 11:19:54 2014 +0900 ---------------------------------------------------------------------- CHANGES | 7 +- .../tajo/master/querymaster/Repartitioner.java | 11 +- .../tajo/master/querymaster/SubQuery.java | 23 +++-- .../tajo/engine/query/TestGroupByQuery.java | 103 ++++++++++++++++++- 4 files changed, 131 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 2e530af..3ac13a9 100644 --- a/CHANGES +++ b/CHANGES @@ -97,6 +97,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim + via hyunsik) + TAJO-977: INSERT into a partitioned table as SELECT statement uses a wrong schema. (Hyoungjun Kim via hyunsik) @@ -112,8 +115,8 @@ Release 0.9.0 - unreleased TAJO-972: Broadcast join with left outer join returns duplicated rows. (Hyoungjun Kim via jaehwa) - TAJO-666: java.nio.BufferOverflowException occurs when the query includes an order by - clause on a TEXT column. (Mai Hai Thanh via jihoon) + TAJO-666: java.nio.BufferOverflowException occurs when the query includes + an order by clause on a TEXT column. (Mai Hai Thanh via jihoon) TAJO-939: Refactoring the column resolver in LogicalPlan. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 31c520f..6eebbde 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -726,12 +726,21 @@ public class Repartitioner { } } + int groupingColumns = 0; GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY); + if (groupby != null) { + groupingColumns = groupby.getGroupingColumns().length; + } else { + DistinctGroupbyNode dGroupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (dGroupby != null) { + groupingColumns = dGroupby.getGroupingColumns().length; + } + } // get a proper number of tasks int determinedTaskNum = Math.min(maxNum, finalFetches.size()); LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size()); - if (groupby != null && groupby.getGroupingColumns().length == 0) { + if (groupingColumns == 0) { determinedTaskNum = 1; LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 94f8b32..f2e9dd5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -48,10 +48,7 @@ import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.planner.logical.GroupbyNode; -import org.apache.tajo.engine.planner.logical.NodeType; -import org.apache.tajo.engine.planner.logical.ScanNode; -import org.apache.tajo.engine.planner.logical.StoreTableNode; +import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.*; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; @@ -716,9 +713,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> { MasterPlan masterPlan = subQuery.getMasterPlan(); ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock()); - GroupbyNode grpNode = null; + LogicalNode grpNode = null; if (parent != null) { grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY); + if (grpNode == null) { + grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY); + } } // We assume this execution block the first stage of join if two or more tables are included in this block, @@ -779,8 +779,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return taskNum; // Is this subquery the first step of group-by? } else if (grpNode != null) { - - if (grpNode.getGroupingColumns().length == 0) { + boolean hasGroupColumns = true; + if (grpNode.getType() == NodeType.GROUP_BY) { + hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; + } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { + hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0; + } + if (!hasGroupColumns) { return 1; } else { long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block); @@ -836,10 +841,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> { long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock()); int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info("Table's volume is approximately " + mb + " MB"); + LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB"); // determine the number of task per 64MB int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64)); - LOG.info("The determined number of non-leaf tasks is " + maxTaskNum); + LOG.info(subQuery.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum); return maxTaskNum; } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f6b5b38/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index 41ffa06..0ffcf11 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -18,23 +18,34 @@ package org.apache.tajo.engine.query; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; +import org.apache.tajo.master.querymaster.Query; +import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.master.querymaster.QueryUnit; +import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.worker.TajoWorker; import org.junit.Test; import org.junit.experimental.categories.Category; import java.sql.ResultSet; +import java.util.*; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; @Category(IntegrationTest.class) public class TestGroupByQuery extends QueryTestCaseBase { + private static final Log LOG = LogFactory.getLog(TestGroupByQuery.class); public TestGroupByQuery() throws Exception { super(TajoConstants.DEFAULT_DATABASE_NAME); @@ -529,4 +540,94 @@ public class TestGroupByQuery extends QueryTestCaseBase { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testNumShufflePartition() throws Exception { + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("col1", Type.TEXT); + schema.addColumn("col2", Type.TEXT); + + List<String> data = new ArrayList<String>(); + int totalBytes = 0; + Random rand = new Random(System.currentTimeMillis()); + String col1Prefix = "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1" + + "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1" + + "Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1Column-1"; + + Set<Integer> uniqKeys = new HashSet<Integer>(); + while(true) { + int col1RandomValue = rand.nextInt(1000000); + uniqKeys.add(col1RandomValue); + String str = (col1Prefix + "-" + col1RandomValue) + "|col2-" + rand.nextInt(1000000); + data.add(str); + + totalBytes += str.getBytes().length; + + if (totalBytes > 3 * 1024 * 1024) { + break; + } + } + TajoTestingCluster.createTable("testnumshufflepartition", schema, tableOptions, data.toArray(new String[]{}), 3); + + try { + testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname, "2"); + ResultSet res = executeString( + "select col1 \n" + + ",count(distinct col2) as cnt1\n" + + "from testnumshufflepartition \n" + + "group by col1" + ); + + int numRows = 0; + while (res.next()) { + numRows++; + } + assertEquals(uniqKeys.size(), numRows); + + // find last QueryMasterTask + List<QueryMasterTask> qmTasks = new ArrayList<QueryMasterTask>(); + + for(TajoWorker worker: testingCluster.getTajoWorkers()) { + qmTasks.addAll(worker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks()); + } + + assertTrue(!qmTasks.isEmpty()); + + Collections.sort(qmTasks, new Comparator<QueryMasterTask>() { + @Override + public int compare(QueryMasterTask o1, QueryMasterTask o2) { + return Long.compare(o1.getQuerySubmitTime(), o2.getQuerySubmitTime()); + } + }); + + // Getting the number of partitions. It should be 2. + Set<Integer> partitionIds = new HashSet<Integer>(); + + Query query = qmTasks.get(qmTasks.size() - 1).getQuery(); + Collection<SubQuery> subQueries = query.getSubQueries(); + assertNotNull(subQueries); + assertTrue(!subQueries.isEmpty()); + for (SubQuery subQuery: subQueries) { + if (subQuery.getId().toStringNoPrefix().endsWith("_000001")) { + QueryUnit[] queryUnits = subQuery.getQueryUnits(); + assertNotNull(queryUnits); + for (QueryUnit eachQueryUnit: queryUnits) { + for (ShuffleFileOutput output: eachQueryUnit.getShuffleFileOutputs()) { + partitionIds.add(output.getPartId()); + } + } + } + } + + assertEquals(2, partitionIds.size()); + executeString("DROP TABLE testnumshufflepartition PURGE").close(); + } finally { + testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.varname, + ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME.defaultVal); + } + } }
