TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa)
Closes #136 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0dfa3972 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0dfa3972 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0dfa3972 Branch: refs/heads/block_iteration Commit: 0dfa3972c6a52d785b8e55f91d0906456a3926b3 Parents: de28c82 Author: Jaehwa Jung <[email protected]> Authored: Wed Oct 8 11:35:45 2014 +0900 Committer: Jaehwa Jung <[email protected]> Committed: Wed Oct 8 11:35:45 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../main/java/org/apache/tajo/SessionVars.java | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 2 + .../eval/AggregationFunctionCallEval.java | 38 +- .../engine/planner/PhysicalPlannerImpl.java | 56 ++- .../tajo/engine/planner/enforce/Enforcer.java | 13 + .../engine/planner/global/GlobalPlanner.java | 58 +-- .../global/builder/DistinctGroupbyBuilder.java | 329 ++++++++++++- .../planner/logical/DistinctGroupbyNode.java | 52 +- .../DistinctGroupbyFirstAggregationExec.java | 476 +++++++++++++++++++ .../DistinctGroupbySecondAggregationExec.java | 295 ++++++++++++ .../DistinctGroupbyThirdAggregationExec.java | 304 ++++++++++++ .../tajo/master/querymaster/Repartitioner.java | 42 +- .../tajo/master/querymaster/SubQuery.java | 28 +- .../src/main/proto/TajoWorkerProtocol.proto | 8 + .../tajo/engine/query/TestGroupByQuery.java | 60 ++- .../testDistinctAggregation8.sql | 9 + .../testDistinctAggregation_case10.sql | 5 + .../testDistinctAggregation_case9.sql | 11 + .../testDistinctAggregation8.result | 7 + .../testDistinctAggregation_case10.result | 3 + .../testDistinctAggregation_case9.result | 6 + .../TestTajoCli/testHelpSessionVars.result | 1 + .../apache/tajo/storage/TupleComparator.java | 8 +- 24 files changed, 1735 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index bff9583..7aa7a0c 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,8 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa) + TAJO-1093: DateTimeFormat.to_char() is slower than SimpleDateFormat.format(). (Jihun Kang via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index cc875b2..1229849 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -98,6 +98,8 @@ public enum SessionVars implements ConfigKey { TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME, "shuffle output size for partition table write (mb)", DEFAULT), + GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT), + // for physical Executors EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT), HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT), http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index f9f5e4a..66d3030 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -316,6 +316,8 @@ public class TajoConf extends Configuration { $DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256), $DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256), + $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true), + // for physical Executors $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L), $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes", http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java index ab18aa9..3216519 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java @@ -30,7 +30,10 @@ import org.apache.tajo.storage.VTuple; public class AggregationFunctionCallEval extends FunctionEval implements Cloneable { @Expose protected AggFunction instance; - @Expose boolean firstPhase = false; + @Expose boolean intermediatePhase = false; + @Expose boolean finalPhase = true; + @Expose String alias; + private Tuple params; protected AggregationFunctionCallEval(EvalType type, FunctionDesc desc, AggFunction instance, EvalNode[] givenArgs) { @@ -58,7 +61,8 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab } } - if (firstPhase) { + if (!intermediatePhase && !finalPhase) { + // firstPhase instance.eval(context, params); } else { instance.merge(context, params); @@ -71,7 +75,7 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab } public Datum terminate(FunctionContext context) { - if (firstPhase) { + if (!finalPhase) { return instance.getPartialResult(context); } else { return instance.terminate(context); @@ -80,18 +84,40 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab @Override public DataType getValueType() { - if (firstPhase) { + if (!finalPhase) { return instance.getPartialResultType(); } else { return funcDesc.getReturnType(); } } + public void setAlias(String alias) { this.alias = alias; } + + public String getAlias() { return this.alias; } + public Object clone() throws CloneNotSupportedException { - return super.clone(); + AggregationFunctionCallEval clone = (AggregationFunctionCallEval)super.clone(); + + clone.finalPhase = finalPhase; + clone.intermediatePhase = intermediatePhase; + clone.alias = alias; + clone.instance = (AggFunction)instance.clone(); + + return clone; } public void setFirstPhase() { - this.firstPhase = true; + this.finalPhase = false; + this.intermediatePhase = false; + } + + public void setFinalPhase() { + this.finalPhase = true; + this.intermediatePhase = false; + } + + public void setIntermediatePhase() { + this.finalPhase = false; + this.intermediatePhase = true; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 2730202..6b1c65c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -43,6 +43,7 @@ import org.apache.tajo.exception.InternalException; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.StorageConstants; @@ -56,6 +57,7 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.List; import java.util.Stack; @@ -1047,17 +1049,61 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { Enforcer enforcer = context.getEnforcer(); EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode); if (property != null) { - DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm(); - if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) { - return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); + if (property.getDistinct().getIsMultipleAggregation()) { + MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); + + if (stage == MultipleAggregationStage.FIRST_STAGE) { + return new DistinctGroupbyFirstAggregationExec(context, distinctNode, subOp); + } else if (stage == MultipleAggregationStage.SECOND_STAGE) { + return new DistinctGroupbySecondAggregationExec(context, distinctNode, + createSortExecForDistinctGroupby(context, distinctNode, subOp, 2)); + } else { + return new DistinctGroupbyThirdAggregationExec(context, distinctNode, + createSortExecForDistinctGroupby(context, distinctNode, subOp, 3)); + } } else { - return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct()); + DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm(); + if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) { + return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); + } else { + return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct()); + } } } else { return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); } } + private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context, + DistinctGroupbyNode distinctNode, + PhysicalExec subOp, + int phase) throws IOException { + SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + //2 phase: seq, groupby columns, distinct1 keys, distinct2 keys, + //3 phase: groupby columns, seq, distinct1 keys, distinct2 keys, + List<SortSpec> sortSpecs = new ArrayList<SortSpec>(); + if (phase == 2) { + sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn())); + } + for (Column eachColumn: distinctNode.getGroupingColumns()) { + sortSpecs.add(new SortSpec(eachColumn)); + } + if (phase == 3) { + sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn())); + } + for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) { + for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) { + sortSpecs.add(new SortSpec(eachColumn)); + } + } + sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[]{})); + sortNode.setInSchema(distinctNode.getInSchema()); + sortNode.setOutSchema(distinctNode.getInSchema()); + ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, subOp); + + return sortExec; + } + private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx, DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException { return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp); @@ -1145,7 +1191,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { } - private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) { + public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) { if (enforcer == null) { return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index 031569e..e2d7744 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.util.TUtil; @@ -135,13 +136,25 @@ public class Enforcer implements ProtoObject<EnforcerProto> { public void enforceDistinctAggregation(int pid, DistinctAggregationAlgorithm algorithm, List<SortSpecArray> sortSpecArrays) { + enforceDistinctAggregation(pid, false, null, algorithm, sortSpecArrays); + } + + public void enforceDistinctAggregation(int pid, + boolean isMultipleAggregation, + MultipleAggregationStage stage, + DistinctAggregationAlgorithm algorithm, + List<SortSpecArray> sortSpecArrays) { EnforceProperty.Builder builder = newProperty(); DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder(); enforce.setPid(pid); + enforce.setIsMultipleAggregation(isMultipleAggregation); enforce.setAlgorithm(algorithm); if (sortSpecArrays != null) { enforce.addAllSortSpecArrays(sortSpecArrays); } + if (stage != null) { + enforce.setMultipleAggregationStage(stage); + } builder.setType(EnforceType.DISTINCT_GROUP_BY); builder.setDistinct(enforce.build()); http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 432589b..01e02d7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -658,47 +658,6 @@ public class GlobalPlanner { return rewritten; } - public ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, - DistinctGroupbyNode firstPhaseGroupBy, - DistinctGroupbyNode secondPhaseGroupBy) { - DataChannel lastDataChannel = null; - - // It pushes down the first phase group-by operator into all child blocks. - // - // (second phase) G (currentBlock) - // /|\ - // / / | \ - // (first phase) G G G G (child block) - - // They are already connected one another. - // So, we don't need to connect them again. - for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) { - if (firstPhaseGroupBy.isEmptyGrouping()) { - dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1); - } else { - dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32); - } - dataChannel.setSchema(firstPhaseGroupBy.getOutSchema()); - ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId()); - - // Why must firstPhaseGroupby be copied? - // - // A groupby in each execution block can have different child. - // It affects groupby's input schema. - DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy); - firstPhaseGroupbyCopy.setChild(childBlock.getPlan()); - childBlock.setPlan(firstPhaseGroupbyCopy); - - // just keep the last data channel. - lastDataChannel = dataChannel; - } - - ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel); - secondPhaseGroupBy.setChild(scanNode); - lastBlock.setPlan(secondPhaseGroupBy); - return lastBlock; - } - /** * If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows: * @@ -824,8 +783,20 @@ public class GlobalPlanner { ExecutionBlock currentBlock; if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function - DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); - return builder.buildPlan(context, lastBlock, groupbyNode); + boolean multiLevelEnabled = context.getPlan().getContext().getBool(SessionVars.GROUPBY_MULTI_LEVEL_ENABLED); + + if (multiLevelEnabled) { + if (PlannerUtil.findTopNode(groupbyNode, NodeType.UNION) == null) { + DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); + return builder.buildMultiLevelPlan(context, lastBlock, groupbyNode); + } else { + DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); + return builder.buildPlan(context, lastBlock, groupbyNode); + } + } else { + DistinctGroupbyBuilder builder = new DistinctGroupbyBuilder(this); + return builder.buildPlan(context, lastBlock, groupbyNode); + } } else { GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode); @@ -968,6 +939,7 @@ public class GlobalPlanner { firstPhaseEvals[i].setFirstPhase(); firstPhaseEvalNames[i] = plan.generateUniqueColumnName(firstPhaseEvals[i]); FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType()); + secondPhaseEvals[i].setFinalPhase(); secondPhaseEvals[i].setArgs(new EvalNode[] {param}); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 8727b84..cbe2d7e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; +import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.engine.eval.AggregationFunctionCallEval; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.eval.EvalTreeUtil; @@ -36,11 +37,14 @@ import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext; +import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; import org.apache.tajo.engine.planner.logical.GroupbyNode; import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.engine.planner.logical.ScanNode; +import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.util.TUtil; @@ -56,6 +60,255 @@ public class DistinctGroupbyBuilder { this.globalPlanner = globalPlanner; } + public ExecutionBlock buildMultiLevelPlan(GlobalPlanContext context, + ExecutionBlock latestExecBlock, + LogicalNode currentNode) throws PlanningException { + try { + GroupbyNode groupbyNode = (GroupbyNode) currentNode; + + LogicalPlan plan = context.getPlan().getLogicalPlan(); + + DistinctGroupbyNode baseDistinctNode = + buildMultiLevelBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode); + baseDistinctNode.setGroupbyPlan(groupbyNode); + + // Set total Aggregation Functions. + AggregationFunctionCallEval[] aggFunctions = + new AggregationFunctionCallEval[groupbyNode.getAggFunctions().length]; + + for (int i = 0; i < aggFunctions.length; i++) { + aggFunctions[i] = (AggregationFunctionCallEval) groupbyNode.getAggFunctions()[i].clone(); + aggFunctions[i].setFirstPhase(); + // If there is not grouping column, we can't find column alias. + // Thus we should find the alias at Groupbynode output schema. + if (groupbyNode.getGroupingColumns().length == 0 + && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) { + aggFunctions[i].setAlias(groupbyNode.getOutSchema().getColumn(i).getQualifiedName()); + } + } + + if (groupbyNode.getGroupingColumns().length == 0 + && aggFunctions.length == groupbyNode.getOutSchema().getColumns().size()) { + groupbyNode.setAggFunctions(aggFunctions); + } + + baseDistinctNode.setAggFunctions(aggFunctions); + + // Create First, SecondStage's Node using baseNode + DistinctGroupbyNode firstStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode); + DistinctGroupbyNode secondStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode); + DistinctGroupbyNode thirdStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode); + + // Set second, third non-distinct aggregation's eval node to field eval + GroupbyNode lastGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(secondStageDistinctNode.getGroupByNodes().size() - 1); + if (!lastGroupbyNode.isDistinct()) { + int index = 0; + for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) { + aggrFunction.setIntermediatePhase(); + aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())}); + index++; + } + } + lastGroupbyNode = thirdStageDistinctNode.getGroupByNodes().get(thirdStageDistinctNode.getGroupByNodes().size() - 1); + if (!lastGroupbyNode.isDistinct()) { + int index = 0; + for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) { + aggrFunction.setFirstPhase(); + aggrFunction.setArgs(new EvalNode[]{new FieldEval(lastGroupbyNode.getTargets()[index].getNamedColumn())}); + index++; + } + } + + // Set in & out schema for each DistinctGroupbyNode. + secondStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema()); + secondStageDistinctNode.setOutSchema(firstStageDistinctNode.getOutSchema()); + thirdStageDistinctNode.setInSchema(firstStageDistinctNode.getOutSchema()); + thirdStageDistinctNode.setOutSchema(groupbyNode.getOutSchema()); + + // Set latestExecBlock's plan with firstDistinctNode + latestExecBlock.setPlan(firstStageDistinctNode); + + // Make SecondStage ExecutionBlock + ExecutionBlock secondStageBlock = context.getPlan().newExecutionBlock(); + + // Make ThirdStage ExecutionBlock + ExecutionBlock thirdStageBlock = context.getPlan().newExecutionBlock(); + + // Set Enforcer + setMultiStageAggregationEnforcer(latestExecBlock, firstStageDistinctNode, secondStageBlock, + secondStageDistinctNode, thirdStageBlock, thirdStageDistinctNode); + + //Create data channel FirstStage to SecondStage + DataChannel firstChannel = new DataChannel(latestExecBlock, secondStageBlock, HASH_SHUFFLE, 32); + + firstChannel.setShuffleKeys(firstStageDistinctNode.getFirstStageShuffleKeyColumns()); + firstChannel.setSchema(firstStageDistinctNode.getOutSchema()); + firstChannel.setStoreType(globalPlanner.getStoreType()); + + ScanNode scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), firstChannel); + secondStageDistinctNode.setChild(scanNode); + + secondStageBlock.setPlan(secondStageDistinctNode); + + context.getPlan().addConnect(firstChannel); + + DataChannel secondChannel; + //Create data channel SecondStage to ThirdStage + if (groupbyNode.isEmptyGrouping()) { + secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 1); + secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns()); + } else { + secondChannel = new DataChannel(secondStageBlock, thirdStageBlock, HASH_SHUFFLE, 32); + secondChannel.setShuffleKeys(firstStageDistinctNode.getGroupingColumns()); + } + secondChannel.setSchema(secondStageDistinctNode.getOutSchema()); + secondChannel.setStoreType(globalPlanner.getStoreType()); + + scanNode = GlobalPlanner.buildInputExecutor(context.getPlan().getLogicalPlan(), secondChannel); + thirdStageDistinctNode.setChild(scanNode); + + thirdStageBlock.setPlan(thirdStageDistinctNode); + + context.getPlan().addConnect(secondChannel); + + if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) { + buildDistinctGroupbyAndUnionPlan( + context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode); + } + + return thirdStageBlock; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new PlanningException(e); + } + } + + private DistinctGroupbyNode buildMultiLevelBaseDistinctGroupByNode(GlobalPlanContext context, + ExecutionBlock latestExecBlock, + GroupbyNode groupbyNode) { + LogicalPlan plan = context.getPlan().getLogicalPlan(); + + /* + Making DistinctGroupbyNode from GroupByNode + select col1, count(distinct col2), count(distinct col3), sum(col4) from ... group by col1 + => DistinctGroupbyNode + Distinct Seq + grouping key = col1 + Sub GroupbyNodes + - GroupByNode1: grouping(col2), expr(count distinct col2) + - GroupByNode2: grouping(col3), expr(count distinct col3) + - GroupByNode3: expr(sum col4) + */ + List<Column> originalGroupingColumns = Arrays.asList(groupbyNode.getGroupingColumns()); + + List<GroupbyNode> childGroupbyNodes = new ArrayList<GroupbyNode>(); + + List<AggregationFunctionCallEval> otherAggregationFunctionCallEvals = new ArrayList<AggregationFunctionCallEval>(); + List<Target> otherAggregationFunctionTargets = new ArrayList<Target>(); + + //distinct columns -> GroupbyNode + Map<String, DistinctGroupbyNodeBuildInfo> distinctNodeBuildInfos = new HashMap<String, DistinctGroupbyNodeBuildInfo>(); + AggregationFunctionCallEval[] aggFunctions = groupbyNode.getAggFunctions(); + for (int aggIdx = 0; aggIdx < aggFunctions.length; aggIdx++) { + AggregationFunctionCallEval aggFunction = aggFunctions[aggIdx]; + aggFunction.setFirstPhase(); + Target originAggFunctionTarget = groupbyNode.getTargets()[originalGroupingColumns.size() + aggIdx]; + Target aggFunctionTarget = + new Target(new FieldEval(originAggFunctionTarget.getEvalTree().getName(), aggFunction.getValueType())); + + if (aggFunction.isDistinct()) { + // Create or reuse Groupby node for each Distinct expression. + LinkedHashSet<Column> groupbyUniqColumns = EvalTreeUtil.findUniqueColumns(aggFunction); + String groupbyMapKey = EvalTreeUtil.columnsToStr(groupbyUniqColumns); + DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(groupbyMapKey); + if (buildInfo == null) { + GroupbyNode distinctGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID()); + buildInfo = new DistinctGroupbyNodeBuildInfo(distinctGroupbyNode); + distinctNodeBuildInfos.put(groupbyMapKey, buildInfo); + + // Grouping columns are GROUP BY clause's column + Distinct column. + List<Column> groupingColumns = new ArrayList<Column>(); + for (Column eachGroupingColumn: groupbyUniqColumns) { + if (!groupingColumns.contains(eachGroupingColumn)) { + groupingColumns.add(eachGroupingColumn); + } + } + distinctGroupbyNode.setGroupingColumns(groupingColumns.toArray(new Column[]{})); + } + buildInfo.addAggFunction(aggFunction); + buildInfo.addAggFunctionTarget(aggFunctionTarget); + } else { + otherAggregationFunctionCallEvals.add(aggFunction); + otherAggregationFunctionTargets.add(aggFunctionTarget); + } + } + + List<Target> baseGroupByTargets = new ArrayList<Target>(); + baseGroupByTargets.add(new Target(new FieldEval(new Column("?distinctseq", Type.INT2)))); + for (Column column : originalGroupingColumns) { + baseGroupByTargets.add(new Target(new FieldEval(column))); + } + + //Add child groupby node for each Distinct clause + for (String eachKey: distinctNodeBuildInfos.keySet()) { + DistinctGroupbyNodeBuildInfo buildInfo = distinctNodeBuildInfos.get(eachKey); + GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode(); + List<AggregationFunctionCallEval> groupbyAggFunctions = buildInfo.getAggFunctions(); + String [] firstPhaseEvalNames = new String[groupbyAggFunctions.size()]; + int index = 0; + for (AggregationFunctionCallEval eachCallEval: groupbyAggFunctions) { + firstPhaseEvalNames[index++] = eachCallEval.getName(); + } + + Target[] targets = new Target[eachGroupbyNode.getGroupingColumns().length + groupbyAggFunctions.size()]; + int targetIdx = 0; + + for (Column column : eachGroupbyNode.getGroupingColumns()) { + Target target = new Target(new FieldEval(column)); + targets[targetIdx++] = target; + baseGroupByTargets.add(target); + } + for (Target eachAggFunctionTarget: buildInfo.getAggFunctionTargets()) { + targets[targetIdx++] = eachAggFunctionTarget; + } + eachGroupbyNode.setTargets(targets); + eachGroupbyNode.setAggFunctions(groupbyAggFunctions.toArray(new AggregationFunctionCallEval[]{})); + eachGroupbyNode.setDistinct(true); + eachGroupbyNode.setInSchema(groupbyNode.getInSchema()); + + childGroupbyNodes.add(eachGroupbyNode); + } + + // Merge other aggregation function to a GroupBy Node. + if (!otherAggregationFunctionCallEvals.isEmpty()) { + // finally this aggregation output tuple's order is GROUP_BY_COL1, COL2, .... + AGG_VALUE, SUM_VALUE, ... + GroupbyNode otherGroupbyNode = new GroupbyNode(context.getPlan().getLogicalPlan().newPID()); + + Target[] targets = new Target[otherAggregationFunctionTargets.size()]; + int targetIdx = 0; + for (Target eachTarget : otherAggregationFunctionTargets) { + targets[targetIdx++] = eachTarget; + baseGroupByTargets.add(eachTarget); + } + + otherGroupbyNode.setTargets(targets); + otherGroupbyNode.setGroupingColumns(new Column[]{}); + otherGroupbyNode.setAggFunctions(otherAggregationFunctionCallEvals.toArray(new AggregationFunctionCallEval[]{})); + otherGroupbyNode.setInSchema(groupbyNode.getInSchema()); + + childGroupbyNodes.add(otherGroupbyNode); + } + + DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID()); + baseDistinctNode.setTargets(baseGroupByTargets.toArray(new Target[]{})); + baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns()); + baseDistinctNode.setInSchema(groupbyNode.getInSchema()); + baseDistinctNode.setChild(groupbyNode.getChild()); + + baseDistinctNode.setGroupbyNodes(childGroupbyNodes); + + return baseDistinctNode; + } public ExecutionBlock buildPlan(GlobalPlanContext context, ExecutionBlock latestExecBlock, @@ -66,7 +319,7 @@ public class DistinctGroupbyBuilder { DistinctGroupbyNode baseDistinctNode = buildBaseDistinctGroupByNode(context, latestExecBlock, groupbyNode); // Create First, SecondStage's Node using baseNode - DistinctGroupbyNode[] distinctNodes = createMultiPhaseDistinctNode(plan, groupbyNode, baseDistinctNode); + DistinctGroupbyNode[] distinctNodes = createTwoPhaseDistinctNode(plan, groupbyNode, baseDistinctNode); DistinctGroupbyNode firstStageDistinctNode = distinctNodes[0]; DistinctGroupbyNode secondStageDistinctNode = distinctNodes[1]; @@ -100,7 +353,7 @@ public class DistinctGroupbyBuilder { context.getPlan().addConnect(channel); if (GlobalPlanner.hasUnionChild(firstStageDistinctNode)) { - globalPlanner.buildDistinctGroupbyAndUnionPlan( + buildDistinctGroupbyAndUnionPlan( context.getPlan(), latestExecBlock, firstStageDistinctNode, firstStageDistinctNode); } @@ -162,6 +415,7 @@ public class DistinctGroupbyBuilder { buildInfo.addAggFunction(aggFunction); buildInfo.addAggFunctionTarget(aggFunctionTarget); } else { + aggFunction.setFinalPhase(); otherAggregationFunctionCallEvals.add(aggFunction); otherAggregationFunctionTargets.add(aggFunctionTarget); } @@ -224,7 +478,7 @@ public class DistinctGroupbyBuilder { return baseDistinctNode; } - public DistinctGroupbyNode[] createMultiPhaseDistinctNode(LogicalPlan plan, + public DistinctGroupbyNode[] createTwoPhaseDistinctNode(LogicalPlan plan, GroupbyNode originGroupbyNode, DistinctGroupbyNode baseDistinctNode) { /* @@ -456,6 +710,75 @@ public class DistinctGroupbyBuilder { } + private void setMultiStageAggregationEnforcer( + ExecutionBlock firstStageBlock, DistinctGroupbyNode firstStageDistinctNode, + ExecutionBlock secondStageBlock, DistinctGroupbyNode secondStageDistinctNode, + ExecutionBlock thirdStageBlock, DistinctGroupbyNode thirdStageDistinctNode) { + firstStageBlock.getEnforcer().enforceDistinctAggregation(firstStageDistinctNode.getPID(), + true, MultipleAggregationStage.FIRST_STAGE, + DistinctAggregationAlgorithm.HASH_AGGREGATION, null); + + secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(), + true, MultipleAggregationStage.SECOND_STAGE, + DistinctAggregationAlgorithm.HASH_AGGREGATION, null); + + List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>(); + int index = 0; + for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) { + List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>(); + for (Column column: groupbyNode.getGroupingColumns()) { + sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build()); + } + sortSpecArrays.add( SortSpecArray.newBuilder() + .setPid(thirdStageDistinctNode.getGroupByNodes().get(index).getPID()) + .addAllSortSpecs(sortSpecs).build()); + } + thirdStageBlock.getEnforcer().enforceDistinctAggregation(thirdStageDistinctNode.getPID(), + true, MultipleAggregationStage.THRID_STAGE, + DistinctAggregationAlgorithm.SORT_AGGREGATION, sortSpecArrays); + } + + private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, + DistinctGroupbyNode firstPhaseGroupBy, + DistinctGroupbyNode secondPhaseGroupBy) { + DataChannel lastDataChannel = null; + + // It pushes down the first phase group-by operator into all child blocks. + // + // (second phase) G (currentBlock) + // /|\ + // / / | \ + // (first phase) G G G G (child block) + + // They are already connected one another. + // So, we don't need to connect them again. + for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) { + if (firstPhaseGroupBy.isEmptyGrouping()) { + dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1); + } else { + dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32); + } + dataChannel.setSchema(firstPhaseGroupBy.getOutSchema()); + ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId()); + + // Why must firstPhaseGroupby be copied? + // + // A groupby in each execution block can have different child. + // It affects groupby's input schema. + DistinctGroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy); + firstPhaseGroupbyCopy.setChild(childBlock.getPlan()); + childBlock.setPlan(firstPhaseGroupbyCopy); + + // just keep the last data channel. + lastDataChannel = dataChannel; + } + + ScanNode scanNode = GlobalPlanner.buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel); + secondPhaseGroupBy.setChild(scanNode); + lastBlock.setPlan(secondPhaseGroupBy); + return lastBlock; + } + static class DistinctGroupbyNodeBuildInfo { private GroupbyNode groupbyNode; private List<AggregationFunctionCallEval> aggFunctions = new ArrayList<AggregationFunctionCallEval>(); http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java index b1e4bc3..47e8933 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java @@ -27,10 +27,15 @@ import org.apache.tajo.engine.planner.Target; import org.apache.tajo.util.TUtil; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class DistinctGroupbyNode extends UnaryNode implements Projectable, Cloneable { @Expose + private GroupbyNode groupbyPlan; + + @Expose private List<GroupbyNode> groupByNodes; @Expose @@ -42,6 +47,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone @Expose private int[] resultColumnIds; + /** Aggregation Functions */ + @Expose private AggregationFunctionCallEval [] aggrFunctions; + public DistinctGroupbyNode(int pid) { super(pid, NodeType.DISTINCT_GROUP_BY); } @@ -59,7 +67,11 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone @Override public Target[] getTargets() { - return new Target[0]; + if (hasTargets()) { + return targets; + } else { + return new Target[0]; + } } public void setGroupbyNodes(List<GroupbyNode> groupByNodes) { @@ -86,6 +98,18 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone this.resultColumnIds = resultColumnIds; } + public AggregationFunctionCallEval [] getAggFunctions() { + return this.aggrFunctions; + } + + public void setAggFunctions(AggregationFunctionCallEval[] evals) { + this.aggrFunctions = evals; + } + + public void setGroupbyPlan(GroupbyNode groupbyPlan) { this.groupbyPlan = groupbyPlan; } + + public GroupbyNode getGroupbyPlan() { return this.groupbyPlan; } + @Override public Object clone() throws CloneNotSupportedException { DistinctGroupbyNode cloneNode = (DistinctGroupbyNode)super.clone(); @@ -113,6 +137,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone } } + if (groupbyPlan != null) { + cloneNode.groupbyPlan = (GroupbyNode)groupbyPlan.clone(); + } return cloneNode; } @@ -200,4 +227,27 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone return planStr; } + + public Column[] getFirstStageShuffleKeyColumns() { + List<Column> shuffleKeyColumns = new ArrayList<Column>(); + shuffleKeyColumns.add(getOutSchema().getColumn(0)); //distinctseq column + if (groupingColumns != null) { + for (Column eachColumn: groupingColumns) { + if (!shuffleKeyColumns.contains(eachColumn)) { + shuffleKeyColumns.add(eachColumn); + } + } + } + for (GroupbyNode eachGroupbyNode: groupByNodes) { + if (eachGroupbyNode.getGroupingColumns() != null && eachGroupbyNode.getGroupingColumns().length > 0) { + for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) { + if (!shuffleKeyColumns.contains(eachColumn)) { + shuffleKeyColumns.add(eachColumn); + } + } + } + } + + return shuffleKeyColumns.toArray(new Column[]{}); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java new file mode 100644 index 0000000..7201ed4 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Int2Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; + +/** + * This class incremented each row to more rows by grouping columns. In addition, the operator must creates each row + * because of aggregation non-distinct columns. + * + * For example, there is a query as follows: + * select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate, + * count(distinct l_partkey), sum(l_orderkey) + * from lineitem + * group by l_linenumber, l_returnflag, l_linestatus, l_shipdate; + * + * If you execute above query on tajo, FileScanner makes tuples after scanning raw data as follows: + * + * ----------------------------------------------------------------------------- + * l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_orderkey, l_partkey + * ----------------------------------------------------------------------------- + * 1, N, O, 1996-03-13, 1, 1 + * 2, N, O, 1996-04-12, 1, 1 + * 1, N, O, 1997-01-28, 2, 2 + * 1, R, F, 1994-02-02, 3, 2 + * 2, R, F, 1993-11-09, 3, 3 + * + * And then the scanner will push it as input data to this class. After then, this class will makes output data as + * follows: + * + * ------------------------------------------------------------------------------------------------------------------- + * NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct, + * l_orderkey for distinct, l_orderkey for nondistinct + * ------------------------------------------------------------------------------------------------------------------- + * 0, 2, R, F, 1993-11-09, 3, NULL, 3 + * 0, 2, N, O, 1996-04-12, 1, NULL, 1 + * 0, 1, N, O, 1997-01-28, 2, NULL, 2 + * 0, 1, R, F, 1994-02-02, 2, NULL, 3 + * 0, 1, N, O, 1996-03-13, 1, NULL, 1 + * 1, 2, R, F, 1993-11-09, NULL, 3, NULL + * 1, 2, N, O, 1996-04-12, NULL, 1, NULL + * 1, 1, N, O, 1997-01-28, NULL, 2, NULL + * 1, 1, R, F, 1994-02-02, NULL, 3, NULL + * 1, 1, N, O, 1996-03-13, NULL, 1, NULL + * + * For reference, NodeSequence means GroupByNode sequence. In this case, there are two GroupByNode. And it consist + * of lineitem.l_partkey and lineitem.l_orderkey. The NodeSequence of lineitem.l_partkey is zero and the sequence of + * lineitem.l_orderkey is one. As above output data, If there are uncomfortable column for DistinctGroupBy, + * inner aggregator makes it to NullDataTum. + * + * In addition, columns for NonDistinctGroupBy only can contains real value at first NodeSequence. + * + */ + +public class DistinctGroupbyFirstAggregationExec extends PhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbyFirstAggregationExec.class); + + private DistinctGroupbyNode plan; + private boolean finished = false; + private boolean preparedData = false; + private PhysicalExec child; + + private long totalNumRows; + private int fetchedRows; + private float progress; + + private int[] groupingKeyIndexes; + private NonDistinctHashAggregator nonDistinctHashAggregator; + private DistinctHashAggregator[] distinctAggregators; + + private int resultTupleLength; + + public DistinctGroupbyFirstAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema()); + this.child = subOp; + this.plan = plan; + } + + @Override + public void init() throws IOException { + super.init(); + child.init(); + + // finding grouping column index + Column[] groupingColumns = plan.getGroupingColumns(); + groupingKeyIndexes = new int[groupingColumns.length]; + + int index = 0; + for (Column col: groupingColumns) { + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + groupingKeyIndexes[index++] = keyIndex; + } + resultTupleLength = groupingKeyIndexes.length + 1; //1 is Sequence Datum which indicates sequence of DistinctNode. + + List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + + List<DistinctHashAggregator> distinctAggrList = new ArrayList<DistinctHashAggregator>(); + int distinctSeq = 0; + for (GroupbyNode eachGroupby: groupbyNodes) { + if (eachGroupby.isDistinct()) { + DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby); + aggregator.setNodeSequence(distinctSeq++); + distinctAggrList.add(aggregator); + resultTupleLength += aggregator.getTupleLength(); + } else { + nonDistinctHashAggregator = new NonDistinctHashAggregator(eachGroupby); + resultTupleLength += nonDistinctHashAggregator.getTupleLength(); + } + } + distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[]{}); + } + + private int currentAggregatorIndex = 0; + + @Override + public Tuple next() throws IOException { + if (!preparedData) { + prepareInputData(); + } + + int prevIndex = currentAggregatorIndex; + while (!context.isStopped()) { + DistinctHashAggregator aggregator = distinctAggregators[currentAggregatorIndex]; + Tuple result = aggregator.next(); + if (result != null) { + return result; + } + currentAggregatorIndex++; + currentAggregatorIndex = currentAggregatorIndex % distinctAggregators.length; + if (currentAggregatorIndex == prevIndex) { + finished = true; + return null; + } + } + + return null; + } + + private void prepareInputData() throws IOException { + Tuple tuple = null; + while(!context.isStopped() && (tuple = child.next()) != null) { + Tuple groupingKey = new VTuple(groupingKeyIndexes.length); + for (int i = 0; i < groupingKeyIndexes.length; i++) { + groupingKey.put(i, tuple.get(groupingKeyIndexes[i])); + } + for (int i = 0; i < distinctAggregators.length; i++) { + distinctAggregators[i].compute(groupingKey, tuple); + } + if (nonDistinctHashAggregator != null) { + nonDistinctHashAggregator.compute(groupingKey, tuple); + } + } + for (int i = 0; i < distinctAggregators.length; i++) { + distinctAggregators[i].rescan(); + } + + totalNumRows = distinctAggregators[0].distinctAggrDatas.size(); + preparedData = true; + } + + @Override + public void close() throws IOException { + child.close(); + } + + @Override + public TableStats getInputStats() { + if (child != null) { + return child.getInputStats(); + } else { + return null; + } + } + + @Override + public float getProgress() { + if (finished) { + return progress; + } else { + if (totalNumRows > 0) { + return progress + ((float)fetchedRows / (float)totalNumRows) * 0.5f; + } else { + return progress; + } + } + } + + @Override + public void rescan() { + finished = false; + currentAggregatorIndex = 0; + for (int i = 0; i < distinctAggregators.length; i++) { + distinctAggregators[i].rescan(); + } + } + + class NonDistinctHashAggregator { + private GroupbyNode groupbyNode; + private int aggFunctionsNum; + private final AggregationFunctionCallEval aggFunctions[]; + + // GroupingKey -> FunctionContext[] + private Map<Tuple, FunctionContext[]> nonDistinctAggrDatas; + private int tupleLength; + + private Tuple dummyTuple; + private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { + this.groupbyNode = groupbyNode; + + nonDistinctAggrDatas = new HashMap<Tuple, FunctionContext[]>(); + + if (groupbyNode.hasAggFunctions()) { + aggFunctions = groupbyNode.getAggFunctions(); + aggFunctionsNum = aggFunctions.length; + for (AggregationFunctionCallEval eachFunction: aggFunctions) { + eachFunction.setFirstPhase(); + } + } else { + aggFunctions = new AggregationFunctionCallEval[0]; + aggFunctionsNum = 0; + } + + dummyTuple = new VTuple(aggFunctionsNum); + for (int i = 0; i < aggFunctionsNum; i++) { + dummyTuple.put(i, NullDatum.get()); + } + tupleLength = aggFunctionsNum; + } + + public void compute(Tuple groupingKeyTuple, Tuple tuple) { + FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple); + if (contexts != null) { + for (int i = 0; i < aggFunctions.length; i++) { + aggFunctions[i].merge(contexts[i], inSchema, tuple); + } + } else { // if the key occurs firstly + contexts = new FunctionContext[aggFunctionsNum]; + for (int i = 0; i < aggFunctionsNum; i++) { + contexts[i] = aggFunctions[i].newContext(); + aggFunctions[i].merge(contexts[i], inSchema, tuple); + } + nonDistinctAggrDatas.put(groupingKeyTuple, contexts); + } + } + + public Tuple aggregate(Tuple groupingKey) { + FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKey); + if (contexts == null) { + return null; + } + Tuple tuple = new VTuple(aggFunctionsNum); + + for (int i = 0; i < aggFunctionsNum; i++) { + tuple.put(i, aggFunctions[i].terminate(contexts[i])); + } + + return tuple; + } + + public int getTupleLength() { + return tupleLength; + } + + public Tuple getDummyTuple() { + return dummyTuple; + } + } + + class DistinctHashAggregator { + private GroupbyNode groupbyNode; + + // GroupingKey -> DistinctKey + private Map<Tuple, Set<Tuple>> distinctAggrDatas; + private Iterator<Entry<Tuple, Set<Tuple>>> iterator = null; + + private int nodeSequence; + private Int2Datum nodeSequenceDatum; + + private int[] distinctKeyIndexes; + + private int tupleLength; + private Tuple dummyTuple; + private boolean aggregatorFinished = false; + + public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { + this.groupbyNode = groupbyNode; + + Set<Integer> groupingKeyIndexSet = new HashSet<Integer>(); + for (Integer eachIndex: groupingKeyIndexes) { + groupingKeyIndexSet.add(eachIndex); + } + + List<Integer> distinctGroupingKeyIndexSet = new ArrayList<Integer>(); + Column[] groupingColumns = groupbyNode.getGroupingColumns(); + for (int idx = 0; idx < groupingColumns.length; idx++) { + Column col = groupingColumns[idx]; + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + if (!groupingKeyIndexSet.contains(keyIndex)) { + distinctGroupingKeyIndexSet.add(keyIndex); + } + } + int index = 0; + this.distinctKeyIndexes = new int[distinctGroupingKeyIndexSet.size()]; + this.dummyTuple = new VTuple(distinctGroupingKeyIndexSet.size()); + for (Integer eachId : distinctGroupingKeyIndexSet) { + this.dummyTuple.put(index, NullDatum.get()); + this.distinctKeyIndexes[index++] = eachId; + } + + this.distinctAggrDatas = new HashMap<Tuple, Set<Tuple>>(); + this.tupleLength = distinctKeyIndexes.length; + } + + public void setNodeSequence(int nodeSequence) { + this.nodeSequence = nodeSequence; + this.nodeSequenceDatum = new Int2Datum((short)nodeSequence); + } + + public int getTupleLength() { + return tupleLength; + } + + public void compute(Tuple groupingKey, Tuple tuple) throws IOException { + Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length); + for (int i = 0; i < distinctKeyIndexes.length; i++) { + distinctKeyTuple.put(i, tuple.get(distinctKeyIndexes[i])); + } + + Set<Tuple> distinctEntry = distinctAggrDatas.get(groupingKey); + if (distinctEntry == null) { + distinctEntry = new HashSet<Tuple>(); + distinctAggrDatas.put(groupingKey, distinctEntry); + } + distinctEntry.add(distinctKeyTuple); + } + + public void rescan() { + iterator = distinctAggrDatas.entrySet().iterator(); + currentGroupingTuples = null; + groupingKeyChanged = false; + aggregatorFinished = false; + } + + public void close() throws IOException { + distinctAggrDatas.clear(); + distinctAggrDatas = null; + currentGroupingTuples = null; + iterator = null; + } + + Entry<Tuple, Set<Tuple>> currentGroupingTuples; + Iterator<Tuple> distinctKeyIterator; + boolean groupingKeyChanged = false; + + public Tuple next() { + if (aggregatorFinished) { + return null; + } + if (currentGroupingTuples == null) { + // first + if (!iterator.hasNext()) { + // Empty case + aggregatorFinished = true; + return null; + } + currentGroupingTuples = iterator.next(); + groupingKeyChanged = true; + distinctKeyIterator = currentGroupingTuples.getValue().iterator(); + } + if (!distinctKeyIterator.hasNext()) { + if (!iterator.hasNext()) { + aggregatorFinished = true; + return null; + } + currentGroupingTuples = iterator.next(); + groupingKeyChanged = true; + distinctKeyIterator = currentGroupingTuples.getValue().iterator(); + } + // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ... + // If n'st == this.nodeSequence set with real data, otherwise set with NullDatum + Tuple tuple = new VTuple(resultTupleLength); + int tupleIndex = 0; + tuple.put(tupleIndex++, nodeSequenceDatum); + + // merge grouping key + Tuple groupingKeyTuple = currentGroupingTuples.getKey(); + int groupingKeyLength = groupingKeyTuple.size(); + for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) { + tuple.put(tupleIndex, groupingKeyTuple.get(i)); + } + + // merge distinctKey + for (int i = 0; i < distinctAggregators.length; i++) { + if (i == nodeSequence) { + Tuple distinctKeyTuple = distinctKeyIterator.next(); + int distinctKeyLength = distinctKeyTuple.size(); + for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) { + tuple.put(tupleIndex, distinctKeyTuple.get(j)); + } + } else { + Tuple dummyTuple = distinctAggregators[i].getDummyTuple(); + int dummyTupleSize = dummyTuple.size(); + for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) { + tuple.put(tupleIndex, dummyTuple.get(j)); + } + } + } + + // merge non distinct aggregation tuple + if (nonDistinctHashAggregator != null) { + Tuple nonDistinctTuple; + if (nodeSequence == 0 && groupingKeyChanged) { + groupingKeyChanged = false; + nonDistinctTuple = nonDistinctHashAggregator.aggregate(groupingKeyTuple); + if (nonDistinctTuple == null) { + nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple(); + } + } else { + nonDistinctTuple = nonDistinctHashAggregator.getDummyTuple(); + } + int tupleSize = nonDistinctTuple.size(); + for (int j = 0; j < tupleSize; j++, tupleIndex++) { + tuple.put(tupleIndex, nonDistinctTuple.get(j)); + } + } + return tuple; + } + + public Tuple getDummyTuple() { + return dummyTuple; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java new file mode 100644 index 0000000..bc8885f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java @@ -0,0 +1,295 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class adjusts shuffle columns between DistinctGroupbyFirstAggregationExec and + * DistinctGroupbyThirdAggregationExec. It shuffled by grouping columns and aggregation columns. Because of the + * shuffle, more DistinctGroupbyThirdAggregationExec will execute compare than previous two distinct group by + * algorithm. And then, many DistinctGroupbyThirdAggregationExec improve the performance of count distinct query. + * + * For example, there is a query as follows: + * select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate, + * count(distinct l_partkey), sum(l_orderkey) + * from lineitem + * group by l_linenumber, l_returnflag, l_linestatus, l_shipdate; + * + * In this case, execution plan for this operator will set shuffle type as follows: + * Incoming: 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), default.lineitem.l_linenumber (INT4), + * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), default.lineitem.l_shipdate (TEXT), + * default.lineitem.l_partkey (INT4), default.lineitem.l_orderkey (INT4), num=32) + * + * Outgoing: 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber (INT4), + * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), + * default.lineitem.l_shipdate (TEXT), num=32) + * + * For reference, input data and output data results as follows: + * + * ------------------------------------------------------------------------------------------------------------------- + * NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct, + * l_orderkey for distinct, l_orderkey for nondistinct + * ------------------------------------------------------------------------------------------------------------------- + * 0, 2, R, F, 1993-11-09, 3, NULL, 3 + * 0, 2, N, O, 1996-04-12, 1, NULL, 1 + * 0, 1, N, O, 1997-01-28, 2, NULL, 2 + * 0, 1, R, F, 1994-02-02, 2, NULL, 3 + * 0, 1, N, O, 1996-03-13, 1, NULL, 1 + * 1, 2, R, F, 1993-11-09, NULL, 3, NULL + * 1, 2, N, O, 1996-04-12, NULL, 1, NULL + * 1, 1, N, O, 1997-01-28, NULL, 2, NULL + * 1, 1, R, F, 1994-02-02, NULL, 3, NULL + * 1, 1, N, O, 1996-03-13, NULL, 1, NULL + * + */ +public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbySecondAggregationExec.class); + private DistinctGroupbyNode plan; + private PhysicalExec child; + + private boolean finished = false; + + private int numGroupingColumns; + private int[][] distinctKeyIndexes; + private FunctionContext[] nonDistinctAggrContexts; + private AggregationFunctionCallEval[] nonDistinctAggrFunctions; + private int nonDistinctAggrTupleStartIndex = -1; + + public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + } + + @Override + public void init() throws IOException { + this.child.init(); + + numGroupingColumns = plan.getGroupingColumns().length; + + List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + + // Finding distinct group by column index. + Set<Integer> groupingKeyIndexSet = new HashSet<Integer>(); + for (Column col: plan.getGroupingColumns()) { + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + groupingKeyIndexSet.add(keyIndex); + } + + int numDistinct = 0; + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + numDistinct++; + } else { + nonDistinctAggrFunctions = eachGroupby.getAggFunctions(); + if (nonDistinctAggrFunctions != null) { + for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) { + eachFunction.setIntermediatePhase(); + } + nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; + } + } + } + + int index = 0; + distinctKeyIndexes = new int[numDistinct][]; + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + List<Integer> distinctGroupingKeyIndex = new ArrayList<Integer>(); + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + for (int idx = 0; idx < distinctGroupingColumns.length; idx++) { + Column col = distinctGroupingColumns[idx]; + int keyIndex; + if (col.hasQualifier()) { + keyIndex = inSchema.getColumnId(col.getQualifiedName()); + } else { + keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + } + if (!groupingKeyIndexSet.contains(keyIndex)) { + distinctGroupingKeyIndex.add(keyIndex); + } + } + int i = 0; + distinctKeyIndexes[index] = new int[distinctGroupingKeyIndex.size()]; + for (int eachIdx : distinctGroupingKeyIndex) { + distinctKeyIndexes[index][i++] = eachIdx; + } + index++; + } + } + if (nonDistinctAggrFunctions != null) { + nonDistinctAggrTupleStartIndex = inSchema.size() - nonDistinctAggrFunctions.length; + } + } + + Tuple prevKeyTuple = null; + Tuple prevTuple = null; + int prevSeq = -1; + + @Override + public Tuple next() throws IOException { + if (finished) { + return null; + } + + Tuple result = null; + while (!context.isStopped()) { + Tuple childTuple = child.next(); + if (childTuple == null) { + finished = true; + + if (prevTuple == null) { + // Empty case + return null; + } + if (prevSeq == 0 && nonDistinctAggrFunctions != null) { + terminatedNonDistinctAggr(prevTuple); + } + result = prevTuple; + break; + } + + Tuple tuple = null; + try { + tuple = childTuple.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + + int distinctSeq = tuple.get(0).asInt2(); + Tuple keyTuple = getKeyTuple(distinctSeq, tuple); + + if (prevKeyTuple == null) { + // First + if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { + initNonDistinctAggrContext(); + mergeNonDistinctAggr(tuple); + } + prevKeyTuple = keyTuple; + prevTuple = tuple; + prevSeq = distinctSeq; + continue; + } + + if (!prevKeyTuple.equals(keyTuple)) { + // new grouping key + if (prevSeq == 0 && nonDistinctAggrFunctions != null) { + terminatedNonDistinctAggr(prevTuple); + } + result = prevTuple; + + prevKeyTuple = keyTuple; + prevTuple = tuple; + prevSeq = distinctSeq; + + if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { + initNonDistinctAggrContext(); + mergeNonDistinctAggr(tuple); + } + break; + } else { + prevKeyTuple = keyTuple; + prevTuple = tuple; + prevSeq = distinctSeq; + if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { + mergeNonDistinctAggr(tuple); + } + } + } + + return result; + } + + private void initNonDistinctAggrContext() { + if (nonDistinctAggrFunctions != null) { + nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; + for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { + nonDistinctAggrContexts[i] = nonDistinctAggrFunctions[i].newContext(); + } + } + } + + private void mergeNonDistinctAggr(Tuple tuple) { + if (nonDistinctAggrFunctions == null) { + return; + } + for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { + nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], inSchema, tuple); + } + } + + private void terminatedNonDistinctAggr(Tuple tuple) { + if (nonDistinctAggrFunctions == null) { + return; + } + for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { + tuple.put(nonDistinctAggrTupleStartIndex + i, nonDistinctAggrFunctions[i].terminate(nonDistinctAggrContexts[i])); + } + } + + private Tuple getKeyTuple(int distinctSeq, Tuple tuple) { + int[] columnIndexes = distinctKeyIndexes[distinctSeq]; + + Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1); + keyTuple.put(0, tuple.get(0)); + for (int i = 0; i < numGroupingColumns; i++) { + keyTuple.put(i + 1, tuple.get(i + 1)); + } + for (int i = 0; i < columnIndexes.length; i++) { + keyTuple.put(i + 1 + numGroupingColumns, tuple.get(columnIndexes[i])); + } + + return keyTuple; + } + + @Override + public void rescan() throws IOException { + super.rescan(); + prevKeyTuple = null; + prevTuple = null; + finished = false; + } + + @Override + public void close() throws IOException { + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java new file mode 100644 index 0000000..239dabf --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.Target; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.*; + +/** + * This class aggregates the output of DistinctGroupbySecondAggregationExec. + * + */ +public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class); + private DistinctGroupbyNode plan; + private PhysicalExec child; + + private boolean finished = false; + + private DistinctFinalAggregator[] aggregators; + private DistinctFinalAggregator nonDistinctAggr; + + private int resultTupleLength; + private int numGroupingColumns; + + private int[] resultTupleIndexes; + + public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + } + + @Override + public void init() throws IOException { + this.child.init(); + + numGroupingColumns = plan.getGroupingColumns().length; + resultTupleLength = numGroupingColumns; + + List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + + List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>(); + int inTupleIndex = 1 + numGroupingColumns; + int outTupleIndex = numGroupingColumns; + int distinctSeq = 0; + + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby)); + distinctSeq++; + + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + inTupleIndex += distinctGroupingColumns.length; + outTupleIndex += eachGroupby.getAggFunctions().length; + } else { + nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby); + outTupleIndex += eachGroupby.getAggFunctions().length; + } + resultTupleLength += eachGroupby.getAggFunctions().length; + } + aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{}); + + // make output schema mapping index + resultTupleIndexes = new int[outSchema.size()]; + Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, Integer>(); + int resultTupleIndex = 0; + for (Column eachColumn: plan.getGroupingColumns()) { + groupbyResultTupleIndex.put(eachColumn, resultTupleIndex); + resultTupleIndex++; + } + for (GroupbyNode eachGroupby : groupbyNodes) { + Set<Column> groupingColumnSet = new HashSet<Column>(); + for (Column column: eachGroupby.getGroupingColumns()) { + groupingColumnSet.add(column); + } + for (Target eachTarget: eachGroupby.getTargets()) { + if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) { + //aggr function + groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex); + resultTupleIndex++; + } + } + } + + int index = 0; + for (Column eachOutputColumn: outSchema.getColumns()) { + // If column is avg aggregation function, outschema's column type is float + // but groupbyResultTupleIndex's column type is protobuf + + int matchedIndex = -1; + for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) { + if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName())) { + matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn); + break; + } + } + if (matchedIndex < 0) { + throw new IOException("Can't find proper output column mapping: " + eachOutputColumn); + } + resultTupleIndexes[matchedIndex] = index++; + } + } + + Tuple prevKeyTuple = null; + Tuple prevTuple = null; + + @Override + public Tuple next() throws IOException { + if (finished) { + return null; + } + + Tuple resultTuple = new VTuple(resultTupleLength); + + while (!context.isStopped()) { + Tuple childTuple = child.next(); + // Last tuple + if (childTuple == null) { + finished = true; + + if (prevTuple == null) { + // Empty case + if (numGroupingColumns == 0) { + // No grouping column, return null tuple + return makeEmptyTuple(); + } else { + return null; + } + } + + for (int i = 0; i < numGroupingColumns; i++) { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + } + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminate(resultTuple); + } + break; + } + + Tuple tuple = null; + try { + tuple = childTuple.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + + int distinctSeq = tuple.get(0).asInt2(); + Tuple keyTuple = getGroupingKeyTuple(tuple); + + // First tuple + if (prevKeyTuple == null) { + prevKeyTuple = keyTuple; + prevTuple = tuple; + + aggregators[distinctSeq].merge(tuple); + continue; + } + + if (!prevKeyTuple.equals(keyTuple)) { + // new grouping key + for (int i = 0; i < numGroupingColumns; i++) { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + } + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminate(resultTuple); + } + + prevKeyTuple = keyTuple; + prevTuple = tuple; + + aggregators[distinctSeq].merge(tuple); + break; + } else { + prevKeyTuple = keyTuple; + prevTuple = tuple; + aggregators[distinctSeq].merge(tuple); + } + } + + return resultTuple; + } + + private Tuple makeEmptyTuple() { + Tuple resultTuple = new VTuple(resultTupleLength); + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminateEmpty(resultTuple); + } + + return resultTuple; + } + + private Tuple getGroupingKeyTuple(Tuple tuple) { + Tuple keyTuple = new VTuple(numGroupingColumns); + for (int i = 0; i < numGroupingColumns; i++) { + keyTuple.put(i, tuple.get(i + 1)); + } + + return keyTuple; + } + + @Override + public void rescan() throws IOException { + super.rescan(); + prevKeyTuple = null; + prevTuple = null; + finished = false; + } + + @Override + public void close() throws IOException { + super.close(); + } + + class DistinctFinalAggregator { + private FunctionContext[] functionContexts; + private AggregationFunctionCallEval[] aggrFunctions; + private int seq; + private int inTupleIndex; + private int outTupleIndex; + public DistinctFinalAggregator(int seq, int inTupleIndex, int outTupleIndex, GroupbyNode groupbyNode) { + this.seq = seq; + this.inTupleIndex = inTupleIndex; + this.outTupleIndex = outTupleIndex; + + aggrFunctions = groupbyNode.getAggFunctions(); + if (aggrFunctions != null) { + for (AggregationFunctionCallEval eachFunction: aggrFunctions) { + eachFunction.setFinalPhase(); + } + } + newFunctionContext(); + } + + private void newFunctionContext() { + functionContexts = new FunctionContext[aggrFunctions.length]; + for (int i = 0; i < aggrFunctions.length; i++) { + functionContexts[i] = aggrFunctions[i].newContext(); + } + } + + public void merge(Tuple tuple) { + for (int i = 0; i < aggrFunctions.length; i++) { + aggrFunctions[i].merge(functionContexts[i], inSchema, tuple); + } + + if (seq == 0 && nonDistinctAggr != null) { + if (!tuple.get(nonDistinctAggr.inTupleIndex).isNull()) { + nonDistinctAggr.merge(tuple); + } + } + } + + public void terminate(Tuple resultTuple) { + for (int i = 0; i < aggrFunctions.length; i++) { + resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i])); + } + newFunctionContext(); + + if (seq == 0 && nonDistinctAggr != null) { + nonDistinctAggr.terminate(resultTuple); + } + } + + public void terminateEmpty(Tuple resultTuple) { + newFunctionContext(); + for (int i = 0; i < aggrFunctions.length; i++) { + resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions[i].terminate(functionContexts[i])); + } + if (seq == 0 && nonDistinctAggr != null) { + nonDistinctAggr.terminateEmpty(resultTuple); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0dfa3972/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 4deddee..598054c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -33,10 +33,8 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.engine.planner.PlannerUtil; -import org.apache.tajo.engine.planner.PlanningException; -import org.apache.tajo.engine.planner.RangePartitionAlgorithm; -import org.apache.tajo.engine.planner.UniformRangePartition; +import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; @@ -45,6 +43,8 @@ import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.exception.InternalException; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.master.TaskSchedulerContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.storage.AbstractStorageManager; @@ -799,13 +799,30 @@ public class Repartitioner { } int groupingColumns = 0; - GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY); - if (groupby != null) { - groupingColumns = groupby.getGroupingColumns().length; - } else { - DistinctGroupbyNode dGroupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); - if (dGroupby != null) { - groupingColumns = dGroupby.getGroupingColumns().length; + LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(), + new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY}); + if (groupbyNodes != null && groupbyNodes.length > 0) { + LogicalNode bottomNode = groupbyNodes[0]; + if (bottomNode.getType() == NodeType.GROUP_BY) { + groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length; + } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) { + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)bottomNode; + } + groupingColumns = distinctNode.getGroupingColumns().length; + + Enforcer enforcer = execBlock.getEnforcer(); + EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); + if (property != null) { + if (property.getDistinct().getIsMultipleAggregation()) { + MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); + if (stage != MultipleAggregationStage.THRID_STAGE) { + groupingColumns = distinctNode.getOutSchema().size(); + } + } + } } } // get a proper number of tasks @@ -1145,7 +1162,8 @@ public class Repartitioner { // set the partition number for group by and sort if (channel.getShuffleType() == HASH_SHUFFLE) { - if (execBlock.getPlan().getType() == NodeType.GROUP_BY) { + if (execBlock.getPlan().getType() == NodeType.GROUP_BY || + execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) { keys = channel.getShuffleKeys(); } } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
