Repository: tajo Updated Branches: refs/heads/master 5e2d6c3c7 -> 48dbfd92c
TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/48dbfd92 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/48dbfd92 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/48dbfd92 Branch: refs/heads/master Commit: 48dbfd92c4f424ffef6c9aa03e905882bd42bbec Parents: 5e2d6c3 Author: blrunner <[email protected]> Authored: Tue Jul 15 12:35:00 2014 +0900 Committer: blrunner <[email protected]> Committed: Tue Jul 15 12:35:00 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 3 + .../engine/planner/PhysicalPlannerImpl.java | 1 + .../engine/planner/global/GlobalPlanner.java | 3 +- .../physical/HashShuffleFileWriteExec.java | 1 + .../tajo/master/querymaster/QueryUnit.java | 13 +++ .../master/querymaster/QueryUnitAttempt.java | 2 +- .../tajo/master/querymaster/Repartitioner.java | 113 ++++++++++++++++++- .../main/java/org/apache/tajo/worker/Task.java | 11 ++ .../apache/tajo/worker/TaskAttemptContext.java | 18 +++ .../src/main/proto/TajoWorkerProtocol.proto | 2 + .../tajo/engine/query/TestTablePartitions.java | 108 ++++++++++++++++-- .../tajo/pullserver/PullServerAuxService.java | 4 +- .../tajo/pullserver/TajoPullServerService.java | 4 +- 14 files changed, 264 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c0dcbe3..3803ded 100644 --- a/CHANGES +++ b/CHANGES @@ -84,6 +84,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa) + TAJO-916: SubQuery::computeStatFromTasks occasionally fail. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index dd5327d..83ff9ed 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -249,6 +249,9 @@ public class TajoConf extends Configuration { DIST_QUERY_SORT_PARTITION_VOLUME("tajo.dist-query.sort.partition-volume-mb", 256), DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256), + DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", + 256 * 1024 * 1024), + ////////////////////////////////// // Physical Executors ////////////////////////////////// http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index cf02ecd..6678e46 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -732,6 +732,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { ShuffleFileWriteNode plan, PhysicalExec subOp) throws IOException { switch (plan.getShuffleType()) { case HASH_SHUFFLE: + case SCATTERED_HASH_SHUFFLE: return new HashShuffleFileWriteExec(ctx, sm, plan, subOp); case RANGE_SHUFFLE: http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 4e27574..69ecd02 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -1149,10 +1149,11 @@ public class GlobalPlanner { shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id); } channel.setShuffleKeys(shuffleKeys); + channel.setShuffleType(SCATTERED_HASH_SHUFFLE); } else { channel.setShuffleKeys(partitionMethod.getExpressionSchema().toArray()); + channel.setShuffleType(HASH_SHUFFLE); } - channel.setShuffleType(HASH_SHUFFLE); channel.setShuffleOutputNum(32); } http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 678b745..44e8646 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -129,6 +129,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { statSet.add(app.getStats()); if (app.getStats().getNumRows() > 0) { context.addShuffleFileOutput(partNum, getDataFile(partNum).getName()); + context.addPartitionOutputVolume(partNum, app.getStats().getNumBytes()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 6cada07..806c0f1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -667,6 +667,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { int attemptId; int partId; PullHost host; + long volume; public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) { this.taskId = taskId; @@ -675,6 +676,14 @@ public class QueryUnit implements EventHandler<TaskEvent> { this.host = host; } + public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) { + this.taskId = taskId; + this.attemptId = attemptId; + this.partId = partId; + this.host = host; + this.volume = volume; + } + public ExecutionBlockId getEbId() { return ebId; } @@ -699,6 +708,10 @@ public class QueryUnit implements EventHandler<TaskEvent> { return this.host; } + public long getVolume() { + return this.volume; + } + @Override public int hashCode() { return Objects.hashCode(ebId, taskId, partId, attemptId, host); http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java index 361f88f..a4fa12f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java @@ -280,7 +280,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { PullHost host = new PullHost(getHost(), getPullServerPort()); for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) { IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(), - getId().getId(), p.getPartId(), host); + getId().getId(), p.getPartId(), host, p.getVolume()); partitions.add(entry); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index ce2194e..1cc5b78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -19,6 +19,7 @@ package org.apache.tajo.master.querymaster; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -60,6 +61,7 @@ import java.util.Map.Entry; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE; +import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE; /** * Repartitioner creates non-leaf tasks and shuffles intermediate data. @@ -529,7 +531,8 @@ public class Repartitioner { MasterPlan masterPlan, SubQuery subQuery, int maxNum) throws IOException { DataChannel channel = masterPlan.getIncomingChannels(subQuery.getBlock().getId()).get(0); - if (channel.getShuffleType() == HASH_SHUFFLE) { + if (channel.getShuffleType() == HASH_SHUFFLE + || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { scheduleHashShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum); } else if (channel.getShuffleType() == RANGE_SHUFFLE) { scheduleRangeShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum); @@ -668,12 +671,23 @@ public class Repartitioner { Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost; Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer, Collection<FetchImpl>>(); + Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId, + List<IntermediateEntry>>(); for (ExecutionBlock block : masterPlan.getChilds(execBlock)) { List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); for (QueryUnit tasks : subQuery.getContext().getSubQuery(block.getId()).getQueryUnits()) { if (tasks.getIntermediateData() != null) { partitions.addAll(tasks.getIntermediateData()); + + // In scattered hash shuffle, Collecting each IntermediateEntry + if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { + if (intermediates.containsKey(block.getId())) { + intermediates.get(block.getId()).addAll(tasks.getIntermediateData()); + } else { + intermediates.put(block.getId(), tasks.getIntermediateData()); + } + } } } Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions); @@ -709,12 +723,99 @@ public class Repartitioner { } // set the proper number of tasks to the estimated task num - schedulerContext.setEstimatedTaskNum(determinedTaskNum); - // divide fetch uris into the the proper number of tasks in a round robin manner. - scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum); - LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum); + if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { + scheduleScatteredHashShuffleFetches(schedulerContext, subQuery, intermediates, + scan.getTableName()); + } else { + schedulerContext.setEstimatedTaskNum(determinedTaskNum); + // divide fetch uris into the the proper number of tasks in a round robin manner. + scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum); + LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum); + } } + // Scattered hash shuffle hashes the key columns and groups the hash keys associated with + // the same hash key. Then, if the volume of a group is larger + // than DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups + // according to DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB). + // As a result, each group size always becomes the less than or equal + // to DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit. + // It is usually used for writing partitioned tables. + public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext, + SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates, + String tableName) { + int i = 0; + int splitVolume = subQuery.getContext().getConf(). + getIntVar(ConfVars.DIST_QUERY_TABLE_PARTITION_VOLUME); + + long sumNumBytes = 0L; + Map<Integer, List<FetchImpl>> fetches = new HashMap<Integer, List<FetchImpl>>(); + + // Step 1 : divide fetch uris into the the proper number of tasks by + // SCATTERED_HASH_SHUFFLE_SPLIT_VOLUME + for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) { + + // Step 2: Sort IntermediateEntry by partition id. After first sort, + // we need to sort again by PullHost address because of data locality. + Collections.sort(listEntry.getValue(), new IntermediateEntryComparator()); + for (IntermediateEntry interm : listEntry.getValue()) { + FetchImpl fetch = new FetchImpl(interm.getPullHost(), SCATTERED_HASH_SHUFFLE, + listEntry.getKey(), interm.getPartId(), TUtil.newList(interm)); + if (fetches.size() == 0) { + fetches.put(i, TUtil.newList(fetch)); + } else { + + // Step 3: Compare current partition id with previous partition id because One task just + // can include one partitionId. + if (fetches.get(i).get(0).getPartitionId() != interm.getPartId()) { + i++; + fetches.put(i, TUtil.newList(fetch)); + sumNumBytes = 0L; + } else { + if ((sumNumBytes + interm.getVolume()) < splitVolume) { + fetches.get(i).add(fetch); + } else { + i++; + fetches.put(i, TUtil.newList(fetch)); + sumNumBytes = 0L; + } + } + } + sumNumBytes += interm.getVolume(); + } + } + + // Step 4 : Set the proper number of tasks to the estimated task num + schedulerContext.setEstimatedTaskNum(fetches.size()); + + // Step 5 : Apply divided fetches + i = 0; + Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()]; + for(Entry<Integer, List<FetchImpl>> entry : fetches.entrySet()) { + fetchesArray[i] = new HashMap<String, List<FetchImpl>>(); + fetchesArray[i].put(tableName, entry.getValue()); + + SubQuery.scheduleFetches(subQuery, fetchesArray[i]); + i++; + } + + LOG.info(subQuery.getId() + + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name() + + ", DeterminedTaskNum : " + fetches.size()); + } + + static class IntermediateEntryComparator implements Comparator<IntermediateEntry> { + + @Override + public int compare(IntermediateEntry o1, IntermediateEntry o2) { + int cmp = Ints.compare(o1.getPartId(), o2.getPartId()); + if (cmp != 0) { + return cmp; + } + + return o1.getPullHost().getHost().compareTo(o2.getPullHost().getHost()); + } + } public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) { String scheme = "http://"; @@ -729,6 +830,8 @@ public class Repartitioner { urlPrefix.append("h"); } else if (fetch.getType() == RANGE_SHUFFLE) { urlPrefix.append("r").append("&").append(fetch.getRangeParams()); + } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) { + urlPrefix.append("s"); } List<URI> fetchURLs = new ArrayList<URI>(); http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index ee3c40d..991dc4b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -348,6 +348,17 @@ public class Task { Entry<Integer,String> entry = it.next(); ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); part.setPartId(entry.getKey()); + + // Set output volume + if (context.getPartitionOutputVolume() != null) { + for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { + if (entry.getKey().equals(e.getKey())) { + part.setVolume(e.getValue().longValue()); + break; + } + } + } + builder.addShuffleFileOutputs(part.build()); } while (it.hasNext()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index e073652..1f0c410 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -71,6 +71,9 @@ public class TaskAttemptContext { private Enforcer enforcer; private QueryContext queryContext; + /** a output volume for each partition */ + private Map<Integer, Long> partitionOutputVolume; + public TaskAttemptContext(TajoConf conf, QueryContext queryContext, final QueryUnitAttemptId queryId, final FragmentProto[] fragments, final Path workDir) { @@ -94,6 +97,8 @@ public class TaskAttemptContext { this.shuffleFileOutputs = Maps.newHashMap(); state = TaskAttemptState.TA_PENDING; + + this.partitionOutputVolume = Maps.newHashMap(); } @VisibleForTesting @@ -193,6 +198,19 @@ public class TaskAttemptContext { return shuffleFileOutputs.entrySet().iterator(); } + public void addPartitionOutputVolume(int partId, long volume) { + if (partitionOutputVolume.containsKey(partId)) { + long sum = partitionOutputVolume.get(partId); + partitionOutputVolume.put(partId, sum + volume); + } else { + partitionOutputVolume.put(partId, volume); + } + } + + public Map<Integer, Long> getPartitionOutputVolume() { + return partitionOutputVolume; + } + public void updateAssignedFragments(String tableId, Fragment[] fragments) { fragmentMap.remove(tableId); for(Fragment t : fragments) { http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index 3bf6e13..ce8ce86 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -126,6 +126,7 @@ enum CommandType { message ShuffleFileOutput { required int32 partId = 1; optional string fileName = 2; + optional int64 volume = 3; } message QueryExecutionRequestProto { @@ -145,6 +146,7 @@ enum ShuffleType { NONE_SHUFFLE = 0; HASH_SHUFFLE = 1; RANGE_SHUFFLE = 2; + SCATTERED_HASH_SHUFFLE = 3; } enum TransmitType { http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index c34c3f4..9db8e41 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -25,21 +25,33 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.tajo.QueryId; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.engine.planner.global.DataChannel; +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.worker.TajoWorker; import org.junit.Test; import java.io.IOException; import java.sql.ResultSet; +import java.util.List; import java.util.Map; +import static junit.framework.TestCase.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; -import static org.junit.Assert.*; +import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; public class TestTablePartitions extends QueryTestCaseBase { @@ -62,6 +74,77 @@ public class TestTablePartitions extends QueryTestCaseBase { res = testBase.execute( "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + "l_quantity from lineitem"); + + MasterPlan plan = getQueryPlan(res); + ExecutionBlock rootEB = plan.getRoot(); + + /* + ------------------------------------------------------------------------------- + |-eb_1405354886454_0001_000003 + |-eb_1405354886454_0001_000002 + |-eb_1405354886454_0001_000001 + */ + assertEquals(1, plan.getChildCount(rootEB.getId())); + + ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0); + assertNotNull(insertEB); + assertEquals(NodeType.INSERT, insertEB.getPlan().getType()); + assertEquals(1, plan.getChildCount(insertEB.getId())); + + ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0); + + List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId()); + assertEquals(1, list.size()); + DataChannel channel = list.get(0); + assertNotNull(channel); + assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); + assertEquals(1, channel.getShuffleKeys().length); + + res.close(); + } + + @Test + public final void testCreateColumnPartitionedTableWithJoin() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin"); + ResultSet res = executeString( + "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); + res.close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + res = testBase.execute( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + + "l_quantity from lineitem join orders on l_orderkey = o_orderkey"); + + MasterPlan plan = getQueryPlan(res); + ExecutionBlock rootEB = plan.getRoot(); + + /* + ------------------------------------------------------------------------------- + |-eb_1405356074433_0001_000005 + |-eb_1405356074433_0001_000004 + |-eb_1405356074433_0001_000003 + |-eb_1405356074433_0001_000002 + |-eb_1405356074433_0001_000001 + */ + assertEquals(1, plan.getChildCount(rootEB.getId())); + + ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0); + assertNotNull(insertEB); + assertEquals(NodeType.INSERT, insertEB.getPlan().getType()); + assertEquals(1, plan.getChildCount(insertEB.getId())); + + ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0); + + List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId()); + assertEquals(1, list.size()); + DataChannel channel = list.get(0); + assertNotNull(channel); + assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); + assertEquals(1, channel.getShuffleKeys().length); + res.close(); } @@ -240,21 +323,13 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(5, desc.getStats().getNumRows().intValue()); } - String expected = "N\n" + - "N\n" + - "N\n" + - "R\n" + - "R\n"; - - String tableData = getTableFileContents(desc.getPath()); - assertEquals(expected, tableData); - res = executeString("select * from " + tableName + " where col2 = 2"); Map<Double, int []> resultRows1 = Maps.newHashMap(); resultRows1.put(45.0d, new int[]{3, 2}); resultRows1.put(38.0d, new int[]{2, 2}); + for (int i = 0; i < 2; i++) { assertTrue(res.next()); assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2)); @@ -575,4 +650,17 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(5, desc.getStats().getNumRows().intValue()); } } + + private MasterPlan getQueryPlan(ResultSet res) { + QueryId queryId = ((TajoResultSet)res).getQueryId(); + for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) { + QueryMasterTask queryMasterTask = eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true); + if (queryMasterTask != null) { + return queryMasterTask.getQuery().getPlan(); + } + } + + fail("Can't find query from workers" + queryId); + return null; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java index b8fda29..dd3bee3 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -424,8 +424,8 @@ public class PullServerAuxService extends AuxiliaryService { chunks.add(chunk); } - // if a subquery requires a hash repartition - } else if (repartitionType.equals("h")) { + // if a subquery requires a hash repartition or a scattered hash repartition + } else if (repartitionType.equals("h") || repartitionType.equals("s")) { for (String ta : taskIds) { Path path = localFS.makeQualified( lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + http://git-wip-us.apache.org/repos/asf/tajo/blob/48dbfd92/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 373642b..5b76da5 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -430,8 +430,8 @@ public class TajoPullServerService extends AbstractService { chunks.add(chunk); } - // if a subquery requires a hash shuffle - } else if (shuffleType.equals("h")) { + // if a subquery requires a hash shuffle or a scattered hash shuffle + } else if (shuffleType.equals("h") || shuffleType.equals("s")) { for (String ta : taskIds) { if (!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/" + partId, conf)) { LOG.warn(e);
