TAJO-1343: Improve the memory usage of physical executors. (jihoon) Closes #634
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4820610f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4820610f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4820610f Branch: refs/heads/master Commit: 4820610f4a2a384372aaecf5212bd486c79a65b2 Parents: e5b30e5 Author: Jihoon Son <[email protected]> Authored: Wed Jul 22 18:04:42 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Wed Jul 22 18:04:42 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/storage/RowStoreUtil.java | 1 - .../java/org/apache/tajo/conf/TajoConf.java | 2 - .../java/org/apache/tajo/storage/NullTuple.java | 4 + .../java/org/apache/tajo/storage/Tuple.java | 2 + .../java/org/apache/tajo/storage/VTuple.java | 7 + .../tajo/engine/planner/KeyProjector.java | 44 +++++ .../tajo/engine/planner/PhysicalPlanner.java | 4 +- .../apache/tajo/engine/planner/Projector.java | 29 +-- .../planner/physical/AggregationExec.java | 11 -- .../engine/planner/physical/BNLJoinExec.java | 31 ++-- .../planner/physical/BSTIndexScanExec.java | 19 +- .../planner/physical/CommonHashJoinExec.java | 70 +++----- .../engine/planner/physical/CommonJoinExec.java | 13 +- .../planner/physical/ComparableVector.java | 8 - .../DistinctGroupbyFirstAggregationExec.java | 179 +++++++++---------- .../DistinctGroupbyHashAggregationExec.java | 177 +++++++++--------- .../DistinctGroupbySecondAggregationExec.java | 108 ++++++----- .../DistinctGroupbySortAggregationExec.java | 27 ++- .../DistinctGroupbyThirdAggregationExec.java | 54 +++--- .../planner/physical/ExternalSortExec.java | 34 ++-- .../planner/physical/HashAggregateExec.java | 21 +-- .../planner/physical/HashFullOuterJoinExec.java | 35 ++-- .../engine/planner/physical/HashJoinExec.java | 19 +- .../planner/physical/HashLeftAntiJoinExec.java | 11 +- .../planner/physical/HashLeftOuterJoinExec.java | 11 +- .../planner/physical/HashLeftSemiJoinExec.java | 11 +- .../physical/HashShuffleFileWriteExec.java | 25 ++- .../tajo/engine/planner/physical/KeyTuple.java | 85 +++++++++ .../engine/planner/physical/MemSortExec.java | 11 +- .../physical/MergeFullOuterJoinExec.java | 77 ++++---- .../engine/planner/physical/MergeJoinExec.java | 72 ++++---- .../engine/planner/physical/NLJoinExec.java | 12 +- .../engine/planner/physical/ProjectionExec.java | 6 +- .../physical/RangeShuffleFileWriteExec.java | 42 ++--- .../physical/RightOuterMergeJoinExec.java | 102 +++++------ .../engine/planner/physical/SeqScanExec.java | 3 +- .../planner/physical/SortAggregateExec.java | 40 +++-- .../SortBasedColPartitionStoreExec.java | 3 +- .../tajo/engine/planner/physical/SortExec.java | 3 +- .../engine/planner/physical/StoreTableExec.java | 2 +- .../tajo/engine/planner/physical/TupleList.java | 44 +++++ .../tajo/engine/planner/physical/TupleMap.java | 74 ++++++++ .../tajo/engine/planner/physical/TupleSet.java | 34 ++++ .../engine/planner/physical/TupleSorter.java | 7 +- .../planner/physical/VectorizedSorter.java | 3 +- .../engine/planner/physical/WindowAggExec.java | 63 ++++--- .../apache/tajo/engine/query/QueryContext.java | 7 +- .../apache/tajo/engine/utils/CacheHolder.java | 13 +- .../org/apache/tajo/engine/utils/TupleUtil.java | 21 --- .../NonForwardQueryResultSystemScanner.java | 8 +- .../org/apache/tajo/TajoTestingCluster.java | 1 + .../tajo/engine/planner/TestLogicalPlanner.java | 4 +- .../planner/physical/TestTupleSorter.java | 11 +- .../tajo/engine/query/TestGroupByQuery.java | 28 +-- .../TestGroupByQuery/testGroupbyWithJson.json | 101 +++++++---- .../TestGroupByQuery/testGroupbyWithLimit3.sql | 2 +- .../testNestedFieldAsGroupbyKey1.sql | 2 + .../TestGroupByQuery/testGroupbyWithJson.result | 4 +- .../testGroupbyWithLimit3.result | 2 +- .../testGroupbyWithPythonFunc.result | 2 +- .../testGroupbyWithPythonFunc2.result | 4 +- .../testHavingWithAggFunction.result | 2 +- .../TestGroupByQuery/testPythonUdaf3.result | 2 +- .../testNestedFieldAsGroupbyKey1.result | 4 +- .../testWindowWithAggregation4.result | 4 +- .../testWindowWithAggregation6.result | 4 +- .../org/apache/tajo/jdbc/MetaDataTuple.java | 5 + .../org/apache/tajo/plan/LogicalPlanner.java | 6 +- .../org/apache/tajo/plan/TablePropertyUtil.java | 2 - .../org/apache/tajo/storage/FrameTuple.java | 5 + .../java/org/apache/tajo/storage/LazyTuple.java | 6 + .../org/apache/tajo/storage/RowStoreUtil.java | 2 +- .../apache/tajo/tuple/offheap/HeapTuple.java | 4 + .../apache/tajo/tuple/offheap/UnSafeTuple.java | 4 + .../apache/tajo/storage/hbase/HBaseScanner.java | 9 +- .../java/org/apache/tajo/storage/CSVFile.java | 64 +++---- .../java/org/apache/tajo/storage/RawFile.java | 42 ++--- .../java/org/apache/tajo/storage/RowFile.java | 3 +- .../apache/tajo/storage/avro/AvroScanner.java | 26 +-- .../apache/tajo/storage/index/bst/BSTIndex.java | 20 ++- .../org/apache/tajo/storage/rcfile/RCFile.java | 9 +- .../sequencefile/SequenceFileScanner.java | 50 ++++-- .../tajo/storage/text/ByteBufLineReader.java | 2 +- .../tajo/storage/text/CSVLineSerializer.java | 1 + .../tajo/storage/text/DelimitedTextFile.java | 13 +- .../apache/tajo/storage/text/TextLineSerDe.java | 14 +- .../apache/tajo/storage/TestMergeScanner.java | 5 +- .../org/apache/tajo/storage/TestStorages.java | 4 +- 89 files changed, 1217 insertions(+), 948 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6001893..e5735f6 100644 --- a/CHANGES +++ b/CHANGES @@ -32,6 +32,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1343: Improve the memory usage of physical executors. (jihoon) + TAJO-1696: Resource calculator should consider the requested disk resource at the first stage. (jihoon) @@ -2534,3 +2536,4 @@ Release 0.2.0 TAJO-252: Add DISCLAIMER file. (hyunsik) TAJO-251: Rename the legacy name *.tql to *.sql. (hyunsik) + http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 5b4a308..87282a0 100644 --- a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -44,7 +44,6 @@ public class RowStoreUtil { } public static Tuple project(Tuple in, Tuple out, int[] targetIds) { - out.clear(); for (int idx = 0; idx < targetIds.length; idx++) { out.put(idx, in.asDatum(targetIds[idx])); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index b876737..910d6bc 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -266,8 +266,6 @@ public class TajoConf extends Configuration { EXECUTOR_EXTERNAL_SORT_THREAD_NUM("tajo.executor.external-sort.thread-num", 1), EXECUTOR_EXTERNAL_SORT_FANOUT("tajo.executor.external-sort.fanout-num", 8), - EXECUTOR_INNER_JOIN_INMEMORY_HASH_TABLE_SIZE("tajo.executor.join.inner.in-memory-table-num", (long)1000000), - // Metrics ---------------------------------------------------------------- METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties"), http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java index a17ef01..967efce 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java @@ -87,6 +87,10 @@ public class NullTuple implements Tuple, Cloneable { } @Override + public void clearOffset() { + } + + @Override public void put(int fieldId, Tuple tuple) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java index 7eb56bd..f4a61e6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java @@ -46,6 +46,8 @@ public interface Tuple extends Cloneable { int size(int fieldId); + void clearOffset(); + void setOffset(long offset); long getOffset(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java index 2c81e54..d7b648d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java @@ -73,6 +73,7 @@ public class VTuple implements Tuple, Cloneable { @Override public void clear() { + clearOffset(); for (int i=0; i < values.length; i++) { values[i] = null; } @@ -100,7 +101,9 @@ public class VTuple implements Tuple, Cloneable { return values[fieldId].size(); } + @Override public void put(Datum [] values) { + clearOffset(); System.arraycopy(values, 0, this.values, 0, values.length); } @@ -111,6 +114,10 @@ public class VTuple implements Tuple, Cloneable { return this.values[fieldId]; } + public void clearOffset() { + this.offset = -1; + } + public void setOffset(long offset) { this.offset = offset; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java new file mode 100644 index 0000000..f6220a8 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.engine.planner.physical.KeyTuple; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.Tuple; + +public class KeyProjector { + + private final KeyTuple keyTuple; + private final int projectIds[]; + + public KeyProjector(Schema inSchema, Column[] keyColumns) { + keyTuple = new KeyTuple(keyColumns.length); + projectIds = new int[keyColumns.length]; + for (int i = 0; i < keyColumns.length; i++) { + projectIds[i] = inSchema.getColumnId(keyColumns[i].getQualifiedName()); + } + } + + public KeyTuple project(Tuple tuple) { + RowStoreUtil.project(tuple, keyTuple, projectIds); + return keyTuple; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java index d4c57db..0983dc6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java @@ -30,7 +30,7 @@ import org.apache.tajo.exception.InternalException; * This class generates a physical execution plan. */ public interface PhysicalPlanner { - public PhysicalExec createPlan(TaskAttemptContext context, - LogicalNode logicalPlan) + PhysicalExec createPlan(TaskAttemptContext context, + LogicalNode logicalPlan) throws InternalException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java index a73478f..ba8ec32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java @@ -20,42 +20,44 @@ package org.apache.tajo.engine.planner; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; public class Projector { private final TaskAttemptContext context; private final Schema inSchema; - private final Target[] targets; // for projection - private final int targetNum; private final EvalNode[] evals; + private final Tuple outTuple; + public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) { this.context = context; this.inSchema = inSchema; + Target[] realTargets; if (targets == null) { - this.targets = PlannerUtil.schemaToTargets(outSchema); + realTargets = PlannerUtil.schemaToTargets(outSchema); } else { - this.targets = targets; + realTargets = targets; } - this.targetNum = this.targets.length; - evals = new EvalNode[targetNum]; + outTuple = new VTuple(realTargets.length); + evals = new EvalNode[realTargets.length]; if (context.getQueryContext().getBool(SessionVars.CODEGEN)) { EvalNode eval; - for (int i = 0; i < targetNum; i++) { - eval = this.targets[i].getEvalTree(); + for (int i = 0; i < realTargets.length; i++) { + eval = realTargets[i].getEvalTree(); evals[i] = context.getPrecompiledEval(inSchema, eval); } } else { - for (int i = 0; i < targetNum; i++) { - evals[i] = this.targets[i].getEvalTree(); + for (int i = 0; i < realTargets.length; i++) { + evals[i] = realTargets[i].getEvalTree(); } } init(); @@ -67,9 +69,10 @@ public class Projector { } } - public void eval(Tuple in, Tuple out) { + public Tuple eval(Tuple in) { for (int i = 0; i < evals.length; i++) { - out.put(i, evals[i].eval(in)); + outTuple.put(i, evals[i].eval(in)); } + return outTuple; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java index 4b53b39..fdb8fdd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java @@ -29,7 +29,6 @@ import java.io.IOException; public abstract class AggregationExec extends UnaryPhysicalExec { protected final int groupingKeyNum; - protected final int groupingKeyIds[]; protected final int aggFunctionsNum; protected final AggregationFunctionCallEval aggFunctions[]; @@ -39,16 +38,6 @@ public abstract class AggregationExec extends UnaryPhysicalExec { final Column [] keyColumns = plan.getGroupingColumns(); groupingKeyNum = keyColumns.length; - groupingKeyIds = new int[groupingKeyNum]; - Column col; - for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) { - col = keyColumns[idx]; - if (col.hasQualifier()) { - groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName()); - } else { - groupingKeyIds[idx] = inSchema.getColumnIdByName(col.getSimpleName()); - } - } if (plan.hasAggFunctions()) { aggFunctions = plan.getAggFunctions(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java index 6e1a553..d28b7f6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java @@ -18,22 +18,18 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; public class BNLJoinExec extends CommonJoinExec { - private List<Tuple> leftTupleSlots; - private List<Tuple> rightTupleSlots; + private TupleList leftTupleSlots; + private TupleList rightTupleSlots; private Iterator<Tuple> leftIterator; private Iterator<Tuple> rightIterator; @@ -41,9 +37,7 @@ public class BNLJoinExec extends CommonJoinExec { private boolean rightEnd; // temporal tuples and states for nested loop join - private FrameTuple frameTuple; private Tuple leftTuple = null; - private Tuple outputTuple = null; private Tuple rightNext = null; private final static int TUPLE_SLOT_SIZE = 10000; @@ -51,8 +45,8 @@ public class BNLJoinExec extends CommonJoinExec { public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan, final PhysicalExec leftExec, PhysicalExec rightExec) { super(context, plan, leftExec, rightExec); - this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE); - this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE); + this.leftTupleSlots = new TupleList(TUPLE_SLOT_SIZE); + this.rightTupleSlots = new TupleList(TUPLE_SLOT_SIZE); this.leftIterator = leftTupleSlots.iterator(); this.rightIterator = rightTupleSlots.iterator(); this.rightEnd = false; @@ -62,10 +56,6 @@ public class BNLJoinExec extends CommonJoinExec { if (!plan.hasTargets()) { plan.setTargets(PlannerUtil.schemaToTargets(outSchema)); } - - // for join - frameTuple = new FrameTuple(); - outputTuple = new VTuple(outSchema.size()); } public Tuple next() throws IOException { @@ -108,7 +98,7 @@ public class BNLJoinExec extends CommonJoinExec { if (rightEnd) { rightChild.rescan(); rightEnd = false; - + if (leftEnd) { return null; } @@ -126,12 +116,12 @@ public class BNLJoinExec extends CommonJoinExec { } leftIterator = leftTupleSlots.iterator(); leftTuple = leftIterator.next(); - + } else { leftIterator = leftTupleSlots.iterator(); leftTuple = leftIterator.next(); } - + rightTupleSlots.clear(); if (rightNext != null) { rightTupleSlots.add(rightNext); @@ -153,7 +143,7 @@ public class BNLJoinExec extends CommonJoinExec { rightTupleSlots.add(t); } } - + if ((rightNext = rightChild.next()) == null) { rightEnd = true; } @@ -163,8 +153,7 @@ public class BNLJoinExec extends CommonJoinExec { frameTuple.set(leftTuple, rightIterator.next()); if (!hasJoinQual || joinQual.eval(frameTuple).isTrue()) { - projector.eval(frameTuple, outputTuple); - return outputTuple; + return projector.eval(frameTuple); } } return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 54abca8..28622d7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -41,20 +41,20 @@ public class BSTIndexScanExec extends PhysicalExec { private BSTIndex.BSTIndexReader reader; private Projector projector; - - private Datum[] datum = null; - + private boolean initialize = true; private float progress; + private Tuple indexLookupKey; + public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode , FileFragment fragment, Path fileName , Schema keySchema, TupleComparator comparator , Datum[] datum) throws IOException { super(context, scanNode.getInSchema(), scanNode.getOutSchema()); this.scanNode = scanNode; this.qual = scanNode.getQual(); - this.datum = datum; + indexLookupKey = new VTuple(datum); this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(), scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema); @@ -80,8 +80,7 @@ public class BSTIndexScanExec extends PhysicalExec { public Tuple next() throws IOException { if(initialize) { //TODO : more complicated condition - Tuple key = new VTuple(datum); - long offset = reader.find(key); + long offset = reader.find(indexLookupKey); if (offset == -1) { reader.close(); fileScanner.close(); @@ -104,19 +103,16 @@ public class BSTIndexScanExec extends PhysicalExec { } } Tuple tuple; - Tuple outTuple = new VTuple(this.outSchema.size()); if (!scanNode.hasQual()) { if ((tuple = fileScanner.next()) != null) { - projector.eval(tuple, outTuple); - return outTuple; + return projector.eval(tuple); } else { return null; } } else { while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) { if (qual.eval(tuple).isTrue()) { - projector.eval(tuple, outTuple); - return outTuple; + return projector.eval(tuple); } else { long offset = reader.next(); if (offset == -1) return null; @@ -140,6 +136,7 @@ public class BSTIndexScanExec extends PhysicalExec { scanNode = null; qual = null; projector = null; + indexLookupKey = null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java index a018fe1..0d64e65 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java @@ -20,21 +20,18 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.engine.utils.CacheHolder; import org.apache.tajo.engine.utils.TableCacheKey; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.ExecutionBlockSharedResource; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; /** * common exec for all hash join execs @@ -47,19 +44,18 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec { // temporal tuples and states for nested loop join protected boolean first = true; - protected Map<Tuple, T> tupleSlots; + protected TupleMap<T> tupleSlots; protected Iterator<Tuple> iterator; - protected final Tuple keyTuple; - protected final int rightNumCols; protected final int leftNumCols; - protected final int[] leftKeyList; - protected final int[] rightKeyList; + protected final Column[] leftKeyList; + protected final Column[] rightKeyList; protected boolean finished; + protected final KeyProjector leftKeyExtractor; public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { super(context, plan, outer, inner); @@ -68,21 +64,18 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec { this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), inner.getSchema(), false); - leftKeyList = new int[joinKeyPairs.size()]; - rightKeyList = new int[joinKeyPairs.size()]; - - for (int i = 0; i < joinKeyPairs.size(); i++) { - leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName()); - } + leftKeyList = new Column[joinKeyPairs.size()]; + rightKeyList = new Column[joinKeyPairs.size()]; for (int i = 0; i < joinKeyPairs.size(); i++) { - rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); + leftKeyList[i] = outer.getSchema().getColumn(joinKeyPairs.get(i)[0].getQualifiedName()); + rightKeyList[i] = inner.getSchema().getColumn(joinKeyPairs.get(i)[1].getQualifiedName()); } leftNumCols = outer.getSchema().size(); rightNumCols = inner.getSchema().size(); - keyTuple = new VTuple(leftKeyList.length); + leftKeyExtractor = new KeyProjector(leftSchema, leftKeyList); } protected void loadRightToHashTable() throws IOException { @@ -102,12 +95,12 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec { protected void loadRightFromCache(TableCacheKey key) throws IOException { ExecutionBlockSharedResource sharedResource = context.getSharedResource(); - CacheHolder<Map<Tuple, List<Tuple>>> holder; + CacheHolder<TupleMap<TupleList>> holder; synchronized (sharedResource.getLock()) { if (sharedResource.hasBroadcastCache(key)) { holder = sharedResource.getBroadcastCache(key); } else { - Map<Tuple, List<Tuple>> built = buildRightToHashTable(); + TupleMap<TupleList> built = buildRightToHashTable(); holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), null); sharedResource.addBroadcastCache(key, holder); } @@ -115,50 +108,27 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec { this.tupleSlots = convert(holder.getData(), true); } - protected Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException { + protected TupleMap<TupleList> buildRightToHashTable() throws IOException { Tuple tuple; - Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000); + TupleMap<TupleList> map = new TupleMap<TupleList>(100000); + KeyProjector keyProjector = new KeyProjector(rightSchema, rightKeyList); while (!context.isStopped() && (tuple = rightChild.next()) != null) { - Tuple keyTuple = new VTuple(joinKeyPairs.size()); - for (int i = 0; i < rightKeyList.length; i++) { - keyTuple.put(i, tuple.asDatum(rightKeyList[i])); - } - - /* - * TODO - * Currently, some physical executors can return new instances of tuple, but others not. - * This sometimes causes wrong results due to the singleton Tuple instance. - * The below line is a temporal solution to fix this problem. - * This will be improved at https://issues.apache.org/jira/browse/TAJO-1343. - */ - try { - tuple = tuple.clone(); - } catch (CloneNotSupportedException e) { - throw new IOException(e); - } - - List<Tuple> newValue = map.get(keyTuple); + KeyTuple keyTuple = keyProjector.project(tuple); + TupleList newValue = map.get(keyTuple); if (newValue == null) { - map.put(keyTuple, newValue = new ArrayList<Tuple>()); + map.put(keyTuple, newValue = new TupleList()); } // if source is scan or groupby, it needs not to be cloned - newValue.add(new VTuple(tuple)); + newValue.add(tuple); } return map; } // todo: convert loaded data to cache condition - protected abstract Map<Tuple, T> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache) + protected abstract TupleMap<T> convert(TupleMap<TupleList> hashed, boolean fromCache) throws IOException; - protected Tuple toKey(final Tuple outerTuple) { - for (int i = 0; i < leftKeyList.length; i++) { - keyTuple.put(i, outerTuple.asDatum(leftKeyList[i])); - } - return keyTuple; - } - @Override public void rescan() throws IOException { super.rescan(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java index ec29085..4f819ad 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java @@ -33,7 +33,6 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -59,7 +58,6 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { protected final Schema rightSchema; protected final FrameTuple frameTuple; - protected final Tuple outTuple; // projection protected Projector projector; @@ -83,7 +81,6 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { // for join this.frameTuple = new FrameTuple(); - this.outTuple = new VTuple(outSchema.size()); } /** @@ -183,13 +180,13 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { } /** - * Return an tuple iterator, containing a single NullTuple + * Create a list that contains a single null tuple. * - * @param width the width of tuple - * @return an tuple iterator, containing a single NullTuple + * @param width the width of null tuple which will be created. + * @return created list of a null tuple */ - protected Iterator<Tuple> nullIterator(int width) { - return Arrays.asList(NullTuple.create(width)).iterator(); + protected List<Tuple> nullTupleList(int width) { + return Arrays.asList(NullTuple.create(width)); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java index 2d836f4..a298564 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java @@ -302,14 +302,6 @@ public class ComparableVector { return new ComparableTuple(keyTypes, keyIndex); } - public VTuple toVTuple() { - VTuple vtuple = new VTuple(keyIndex.length); - for (int i = 0; i < keyIndex.length; i++) { - vtuple.put(i, toDatum(i)); - } - return vtuple; - } - public Datum toDatum(int i) { if (keys[i] == null) { return NullDatum.get(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java index 7784817..7ac3e0b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java @@ -23,13 +23,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Int2Datum; -import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.DistinctGroupbyNode; import org.apache.tajo.plan.logical.GroupbyNode; +import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -94,9 +96,11 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { private long totalNumRows; private int fetchedRows; - private int[] groupingKeyIndexes; private NonDistinctHashAggregator nonDistinctHashAggregator; - private DistinctHashAggregator[] distinctAggregators; + private Map<Integer, DistinctHashAggregator> nodeSeqToDistinctAggregators = TUtil.newHashMap(); + + private KeyProjector nonDistinctGroupingKeyProjector; + private Map<Integer, KeyProjector> distinctGroupbyKeyProjectors = TUtil.newHashMap(); private int resultTupleLength; @@ -111,37 +115,23 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { super.init(); // finding grouping column index - Column[] groupingColumns = plan.getGroupingColumns(); - groupingKeyIndexes = new int[groupingColumns.length]; - - int index = 0; - for (Column col: groupingColumns) { - int keyIndex; - if (col.hasQualifier()) { - keyIndex = inSchema.getColumnId(col.getQualifiedName()); - } else { - keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); - } - groupingKeyIndexes[index++] = keyIndex; - } - resultTupleLength = groupingKeyIndexes.length + 1; //1 is Sequence Datum which indicates sequence of DistinctNode. + Column[] groupingKeyColumns = plan.getGroupingColumns(); + nonDistinctGroupingKeyProjector = new KeyProjector(inSchema, plan.getGroupingColumns()); + resultTupleLength = groupingKeyColumns.length + 1; //1 is Sequence Datum which indicates sequence of DistinctNode. List<GroupbyNode> groupbyNodes = plan.getSubPlans(); - List<DistinctHashAggregator> distinctAggrList = new ArrayList<DistinctHashAggregator>(); int distinctSeq = 0; for (GroupbyNode eachGroupby: groupbyNodes) { if (eachGroupby.isDistinct()) { - DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby); - aggregator.setNodeSequence(distinctSeq++); - distinctAggrList.add(aggregator); + DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby, distinctSeq); + nodeSeqToDistinctAggregators.put(distinctSeq++, aggregator); resultTupleLength += aggregator.getTupleLength(); } else { nonDistinctHashAggregator = new NonDistinctHashAggregator(eachGroupby); resultTupleLength += nonDistinctHashAggregator.getTupleLength(); } } - distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[]{}); } private int currentAggregatorIndex = 0; @@ -154,13 +144,13 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { int prevIndex = currentAggregatorIndex; while (!context.isStopped()) { - DistinctHashAggregator aggregator = distinctAggregators[currentAggregatorIndex]; + DistinctHashAggregator aggregator = nodeSeqToDistinctAggregators.get(currentAggregatorIndex); Tuple result = aggregator.next(); if (result != null) { return result; } currentAggregatorIndex++; - currentAggregatorIndex = currentAggregatorIndex % distinctAggregators.length; + currentAggregatorIndex = currentAggregatorIndex % nodeSeqToDistinctAggregators.size(); if (currentAggregatorIndex == prevIndex) { finished = true; return null; @@ -171,29 +161,36 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { } private void prepareInputData() throws IOException { - Tuple tuple = null; + Tuple tuple; + while(!context.isStopped() && (tuple = child.next()) != null) { - Tuple groupingKey = new VTuple(groupingKeyIndexes.length); - for (int i = 0; i < groupingKeyIndexes.length; i++) { - groupingKey.put(i, tuple.asDatum(groupingKeyIndexes[i])); - } - for (int i = 0; i < distinctAggregators.length; i++) { - distinctAggregators[i].compute(groupingKey, tuple); + + KeyTuple groupingKey = nonDistinctGroupingKeyProjector.project(tuple); + for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) { + nodeSeqToDistinctAggregators.get(i).compute(groupingKey, tuple); } if (nonDistinctHashAggregator != null) { nonDistinctHashAggregator.compute(groupingKey, tuple); } } - for (int i = 0; i < distinctAggregators.length; i++) { - distinctAggregators[i].rescan(); + for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) { + nodeSeqToDistinctAggregators.get(i).rescan(); } - totalNumRows = distinctAggregators[0].distinctAggrDatas.size(); + totalNumRows = nodeSeqToDistinctAggregators.get(0).distinctAggrDatas.size(); preparedData = true; } @Override public void close() throws IOException { + if (nonDistinctHashAggregator != null) { + nonDistinctHashAggregator.close(); + nonDistinctHashAggregator = null; + } + for (DistinctHashAggregator aggregator : nodeSeqToDistinctAggregators.values()) { + aggregator.close(); + } + nodeSeqToDistinctAggregators.clear(); child.close(); } @@ -223,8 +220,8 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { public void rescan() { finished = false; currentAggregatorIndex = 0; - for (int i = 0; i < distinctAggregators.length; i++) { - distinctAggregators[i].rescan(); + for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) { + nodeSeqToDistinctAggregators.get(i).rescan(); } } @@ -233,13 +230,14 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { private final AggregationFunctionCallEval aggFunctions[]; // GroupingKey -> FunctionContext[] - private Map<Tuple, FunctionContext[]> nonDistinctAggrDatas; + private TupleMap<FunctionContext[]> nonDistinctAggrDatas; private int tupleLength; - private Tuple dummyTuple; + private final Tuple dummyTuple; + private final Tuple outTuple; private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { - nonDistinctAggrDatas = new HashMap<Tuple, FunctionContext[]>(); + nonDistinctAggrDatas = new TupleMap<FunctionContext[]>(); if (groupbyNode.hasAggFunctions()) { aggFunctions = groupbyNode.getAggFunctions(); @@ -254,14 +252,12 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { eachFunction.setFirstPhase(); } - dummyTuple = new VTuple(aggFunctionsNum); - for (int i = 0; i < aggFunctionsNum; i++) { - dummyTuple.put(i, NullDatum.get()); - } + outTuple = new VTuple(aggFunctionsNum); + dummyTuple = NullTuple.create(aggFunctionsNum); tupleLength = aggFunctionsNum; } - public void compute(Tuple groupingKeyTuple, Tuple tuple) { + public void compute(KeyTuple groupingKeyTuple, Tuple tuple) { FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple); if (contexts != null) { for (int i = 0; i < aggFunctions.length; i++) { @@ -282,13 +278,12 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { if (contexts == null) { return null; } - Tuple tuple = new VTuple(aggFunctionsNum); for (int i = 0; i < aggFunctionsNum; i++) { - tuple.put(i, aggFunctions[i].terminate(contexts[i])); + outTuple.put(i, aggFunctions[i].terminate(contexts[i])); } - return tuple; + return outTuple; } public int getTupleLength() { @@ -298,57 +293,51 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { public Tuple getDummyTuple() { return dummyTuple; } + + public void close() { + nonDistinctAggrDatas.clear(); + nonDistinctAggrDatas = null; + } } class DistinctHashAggregator { // GroupingKey -> DistinctKey - private Map<Tuple, Set<Tuple>> distinctAggrDatas; - private Iterator<Entry<Tuple, Set<Tuple>>> iterator = null; + private TupleMap<TupleSet> distinctAggrDatas; + private Iterator<Entry<KeyTuple, TupleSet>> iterator = null; private int nodeSequence; private Int2Datum nodeSequenceDatum; - private int[] distinctKeyIndexes; - private int tupleLength; - private Tuple dummyTuple; + private final Tuple dummyTuple; + private Tuple outTuple; private boolean aggregatorFinished = false; - public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException { + public DistinctHashAggregator(GroupbyNode groupbyNode, int nodeSequence) throws IOException { - Set<Integer> groupingKeyIndexSet = new HashSet<Integer>(); - for (Integer eachIndex: groupingKeyIndexes) { - groupingKeyIndexSet.add(eachIndex); - } + Set<Column> groupingKeySet = TUtil.newHashSet(plan.getGroupingColumns()); - List<Integer> distinctGroupingKeyIndexSet = new ArrayList<Integer>(); + List<Column> distinctGroupingKeyIndexSet = new ArrayList<Column>(); Column[] groupingColumns = groupbyNode.getGroupingColumns(); for (int idx = 0; idx < groupingColumns.length; idx++) { Column col = groupingColumns[idx]; - int keyIndex; - if (col.hasQualifier()) { - keyIndex = inSchema.getColumnId(col.getQualifiedName()); - } else { - keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); - } - if (!groupingKeyIndexSet.contains(keyIndex)) { - distinctGroupingKeyIndexSet.add(keyIndex); + if (!groupingKeySet.contains(col)) { + distinctGroupingKeyIndexSet.add(col); } } - int index = 0; - this.distinctKeyIndexes = new int[distinctGroupingKeyIndexSet.size()]; - this.dummyTuple = new VTuple(distinctGroupingKeyIndexSet.size()); - for (Integer eachId : distinctGroupingKeyIndexSet) { - this.dummyTuple.put(index, NullDatum.get()); - this.distinctKeyIndexes[index++] = eachId; - } + Column[] distinctKeyColumns = new Column[distinctGroupingKeyIndexSet.size()]; + distinctKeyColumns = distinctGroupingKeyIndexSet.toArray(distinctKeyColumns); + this.dummyTuple = NullTuple.create(distinctGroupingKeyIndexSet.size()); + + this.distinctAggrDatas = new TupleMap<TupleSet>(); + distinctGroupbyKeyProjectors.put(nodeSequence, new KeyProjector(inSchema, distinctKeyColumns)); + this.tupleLength = distinctKeyColumns.length; - this.distinctAggrDatas = new HashMap<Tuple, Set<Tuple>>(); - this.tupleLength = distinctKeyIndexes.length; + setNodeSequence(nodeSequence); } - public void setNodeSequence(int nodeSequence) { + private void setNodeSequence(int nodeSequence) { this.nodeSequence = nodeSequence; this.nodeSequenceDatum = new Int2Datum((short)nodeSequence); } @@ -357,15 +346,12 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { return tupleLength; } - public void compute(Tuple groupingKey, Tuple tuple) throws IOException { - Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length); - for (int i = 0; i < distinctKeyIndexes.length; i++) { - distinctKeyTuple.put(i, tuple.asDatum(distinctKeyIndexes[i])); - } + public void compute(KeyTuple groupingKey, Tuple tuple) throws IOException { + KeyTuple distinctKeyTuple = distinctGroupbyKeyProjectors.get(nodeSequence).project(tuple); - Set<Tuple> distinctEntry = distinctAggrDatas.get(groupingKey); + TupleSet distinctEntry = distinctAggrDatas.get(groupingKey); if (distinctEntry == null) { - distinctEntry = new HashSet<Tuple>(); + distinctEntry = new TupleSet(); distinctAggrDatas.put(groupingKey, distinctEntry); } distinctEntry.add(distinctKeyTuple); @@ -379,14 +365,17 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { } public void close() throws IOException { + for (TupleSet set : distinctAggrDatas.values()) { + set.clear(); + } distinctAggrDatas.clear(); distinctAggrDatas = null; currentGroupingTuples = null; iterator = null; } - Entry<Tuple, Set<Tuple>> currentGroupingTuples; - Iterator<Tuple> distinctKeyIterator; + Entry<KeyTuple, TupleSet> currentGroupingTuples; + Iterator<KeyTuple> distinctKeyIterator; boolean groupingKeyChanged = false; public Tuple next() { @@ -415,30 +404,32 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { } // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ... // If n'st == this.nodeSequence set with real data, otherwise set with NullDatum - VTuple tuple = new VTuple(resultTupleLength); int tupleIndex = 0; - tuple.put(tupleIndex++, nodeSequenceDatum); + if (outTuple == null) { + outTuple = new VTuple(resultTupleLength); + } + outTuple.put(tupleIndex++, nodeSequenceDatum); // merge grouping key Tuple groupingKeyTuple = currentGroupingTuples.getKey(); int groupingKeyLength = groupingKeyTuple.size(); for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) { - tuple.put(tupleIndex, groupingKeyTuple.asDatum(i)); + outTuple.put(tupleIndex, groupingKeyTuple.asDatum(i)); } // merge distinctKey - for (int i = 0; i < distinctAggregators.length; i++) { + for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) { if (i == nodeSequence) { Tuple distinctKeyTuple = distinctKeyIterator.next(); int distinctKeyLength = distinctKeyTuple.size(); for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) { - tuple.put(tupleIndex, distinctKeyTuple.asDatum(j)); + outTuple.put(tupleIndex, distinctKeyTuple.asDatum(j)); } } else { - Tuple dummyTuple = distinctAggregators[i].getDummyTuple(); + Tuple dummyTuple = nodeSeqToDistinctAggregators.get(i).getDummyTuple(); int dummyTupleSize = dummyTuple.size(); for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) { - tuple.put(tupleIndex, dummyTuple.asDatum(j)); + outTuple.put(tupleIndex, dummyTuple.asDatum(j)); } } } @@ -457,10 +448,10 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec { } int tupleSize = nonDistinctTuple.size(); for (int j = 0; j < tupleSize; j++, tupleIndex++) { - tuple.put(tupleIndex, nonDistinctTuple.asDatum(j)); + outTuple.put(tupleIndex, nonDistinctTuple.asDatum(j)); } } - return tuple; + return outTuple; } public Tuple getDummyTuple() { http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java index c8a6588..2d1fa4b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java @@ -21,27 +21,34 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.DistinctGroupbyNode; import org.apache.tajo.plan.logical.GroupbyNode; +import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { + private boolean finished = false; - private DistinctGroupbyNode plan; + private final DistinctGroupbyNode plan; private HashAggregator[] hashAggregators; - private int distinctGroupingKeyIds[]; + + private List<Column> distinctGroupingKeyColumnSet; private boolean first = true; private int groupbyNodeNum; private int outputColumnNum; @@ -50,6 +57,10 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { private int[] resultColumnIdIndexes; + private Tuple outTuple; + + private KeyProjector outerKeyProjector; + public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), subOp); @@ -60,23 +71,16 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { public void init() throws IOException { super.init(); - List<Integer> distinctGroupingKeyIdList = new ArrayList<Integer>(); - for (Column col: plan.getGroupingColumns()) { - int keyIndex; - if (col.hasQualifier()) { - keyIndex = inSchema.getColumnId(col.getQualifiedName()); - } else { - keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); - } - if (!distinctGroupingKeyIdList.contains(keyIndex)) { - distinctGroupingKeyIdList.add(keyIndex); + distinctGroupingKeyColumnSet = TUtil.newList(); + for (Column col : plan.getGroupingColumns()) { + if (!distinctGroupingKeyColumnSet.contains(col)) { + distinctGroupingKeyColumnSet.add(col); } } - int idx = 0; - distinctGroupingKeyIds = new int[distinctGroupingKeyIdList.size()]; - for (Integer intVal: distinctGroupingKeyIdList) { - distinctGroupingKeyIds[idx++] = intVal; - } + Column[] distinctGroupingKeyColumns = new Column[distinctGroupingKeyColumnSet.size()]; + distinctGroupingKeyColumns = distinctGroupingKeyColumnSet.toArray(distinctGroupingKeyColumns); + + outerKeyProjector = new KeyProjector(inSchema, distinctGroupingKeyColumns); List<GroupbyNode> groupbyNodes = plan.getSubPlans(); groupbyNodeNum = groupbyNodes.size(); @@ -88,6 +92,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { } outputColumnNum = plan.getOutSchema().size(); + outTuple = new VTuple(outputColumnNum); int allGroupbyOutColNum = 0; for (GroupbyNode eachGroupby: plan.getSubPlans()) { @@ -105,7 +110,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { } } - List<Tuple> currentAggregatedTuples = null; + TupleList currentAggregatedTuples = null; int currentAggregatedTupleIndex = 0; int currentAggregatedTupleSize = 0; @@ -143,20 +148,21 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { // Groupby_Key2 | Distinct1_Column_V3 | | | //-------------------------------------------------------------------------------------- - List<List<Tuple>> tupleSlots = new ArrayList<List<Tuple>>(); + List<TupleList> tupleSlots = new ArrayList<TupleList>(); // aggregation with single grouping key for (int i = 0; i < hashAggregators.length; i++) { - if (!hashAggregators[i].iterator.hasNext()) { + HashAggregator hashAggregator = hashAggregators[i]; + if (!hashAggregator.iterator.hasNext()) { nullCount++; - tupleSlots.add(new ArrayList<Tuple>()); + tupleSlots.add(new TupleList()); continue; } - Entry<Tuple, Map<Tuple, FunctionContext[]>> entry = hashAggregators[i].iterator.next(); + Entry<KeyTuple, TupleMap<FunctionContext[]>> entry = hashAggregator.iterator.next(); if (distinctGroupingKey == null) { distinctGroupingKey = entry.getKey(); } - List<Tuple> aggregatedTuples = hashAggregators[i].aggregate(entry.getValue()); + TupleList aggregatedTuples = hashAggregator.aggregate(entry.getValue()); tupleSlots.add(aggregatedTuples); } @@ -167,11 +173,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { // If DistinctGroupbyHashAggregationExec does not have any rows, // it should return NullDatum. if (totalNumRows == 0 && groupbyNodeNum == 0) { - Tuple tuple = new VTuple(outputColumnNum); - for (int i = 0; i < tuple.size(); i++) { - tuple.put(i, DatumFactory.createNullDatum()); - } - return tuple; + return NullTuple.create(outputColumnNum); } else { return null; } @@ -206,20 +208,23 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { */ // currentAggregatedTuples has tuples which has same group key. - currentAggregatedTuples = new ArrayList<Tuple>(); + if (currentAggregatedTuples == null) { + currentAggregatedTuples = new TupleList(); + } else { + currentAggregatedTuples.clear(); + } int listIndex = 0; while (true) { - // Each item in tuples is VTuple. So the tuples variable is two dimensions(tuple[aggregator][datum]). Tuple[] tuples = new Tuple[hashAggregators.length]; + // Each item in tuples is VTuple. So the tuples variable is two dimensions(tuple[aggregator][datum]). for (int i = 0; i < hashAggregators.length; i++) { - List<Tuple> aggregatedTuples = tupleSlots.get(i); + TupleList aggregatedTuples = tupleSlots.get(i); if (aggregatedTuples.size() > listIndex) { tuples[i] = tupleSlots.get(i).get(listIndex); } } //merge - Tuple mergedTuple = new VTuple(outputColumnNum); int resultColumnIdx = 0; boolean allNull = true; @@ -236,12 +241,12 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { // set group key tuple // Because each hashAggregator has different number of tuples, // sometimes getting group key from each hashAggregator will be null value. - mergedTuple.put(mergeTupleIndex, distinctGroupingKey.asDatum(mergeTupleIndex)); + outTuple.put(mergeTupleIndex, distinctGroupingKey.asDatum(mergeTupleIndex)); } else { if (tuples[i] != null) { - mergedTuple.put(mergeTupleIndex, tuples[i].asDatum(j)); + outTuple.put(mergeTupleIndex, tuples[i].asDatum(j)); } else { - mergedTuple.put(mergeTupleIndex, NullDatum.get()); + outTuple.put(mergeTupleIndex, NullDatum.get()); } } } @@ -253,10 +258,15 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { break; } - currentAggregatedTuples.add(mergedTuple); + currentAggregatedTuples.add(outTuple); listIndex++; } + for (TupleList eachList : tupleSlots) { + eachList.clear(); + } + tupleSlots.clear(); + currentAggregatedTupleIndex = 0; currentAggregatedTupleSize = currentAggregatedTuples.size(); @@ -267,13 +277,11 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { } fetchedRows++; - Tuple tuple = currentAggregatedTuples.get(currentAggregatedTupleIndex++); - - return tuple; + return currentAggregatedTuples.get(currentAggregatedTupleIndex++); } private void loadChildHashTable() throws IOException { - Tuple tuple = null; + Tuple tuple; while(!context.isStopped() && (tuple = child.next()) != null) { for (int i = 0; i < hashAggregators.length; i++) { hashAggregators[i].compute(tuple); @@ -296,6 +304,13 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { if (child != null) { child.close(); } + if (currentAggregatedTuples != null) { + currentAggregatedTuples.clear(); + currentAggregatedTuples = null; + } + if (distinctGroupingKeyColumnSet != null) { + distinctGroupingKeyColumnSet.clear(); + } } public void rescan() throws IOException { @@ -303,6 +318,9 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { for (int i = 0; i < hashAggregators.length; i++) { hashAggregators[i].initFetch(); } + if (currentAggregatedTuples != null) { + currentAggregatedTuples.clear(); + } } public float getProgress() { @@ -327,44 +345,33 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { class HashAggregator { // Outer's GroupBy Key -> Each GroupByNode's Key -> FunctionContext - private Map<Tuple, Map<Tuple, FunctionContext[]>> hashTable; - private Iterator<Entry<Tuple, Map<Tuple, FunctionContext[]>>> iterator = null; + private TupleMap<TupleMap<FunctionContext[]>> hashTable; + private Iterator<Entry<KeyTuple, TupleMap<FunctionContext[]>>> iterator = null; + + private final KeyProjector innerKeyProjector; - private int groupingKeyIds[]; private final int aggFunctionsNum; private final AggregationFunctionCallEval aggFunctions[]; - int tupleSize; + private final Tuple aggregatedTuple; + + private final int tupleSize; public HashAggregator(GroupbyNode groupbyNode, Schema schema) throws IOException { - hashTable = new HashMap<Tuple, Map<Tuple, FunctionContext[]>>(10000); + hashTable = new TupleMap<TupleMap<FunctionContext[]>>(10000); - List<Integer> distinctGroupingKeyIdSet = new ArrayList<Integer>(); - for (int i = 0; i < distinctGroupingKeyIds.length; i++) { - distinctGroupingKeyIdSet.add(distinctGroupingKeyIds[i]); - } + List<Column> groupingKeyColumnList = new ArrayList<Column>(distinctGroupingKeyColumnSet); - List<Integer> groupingKeyIdList = new ArrayList<Integer>(distinctGroupingKeyIdSet); Column[] keyColumns = groupbyNode.getGroupingColumns(); Column col; for (int idx = 0; idx < keyColumns.length; idx++) { col = keyColumns[idx]; - int keyIndex; - if (col.hasQualifier()) { - keyIndex = inSchema.getColumnId(col.getQualifiedName()); - } else { - keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); + if (!distinctGroupingKeyColumnSet.contains(col)) { + groupingKeyColumnList.add(col); } - if (!distinctGroupingKeyIdSet.contains(keyIndex)) { - groupingKeyIdList.add(keyIndex); - } - } - int index = 0; - groupingKeyIds = new int[groupingKeyIdList.size()]; - for (Integer eachId : groupingKeyIdList) { - groupingKeyIds[index++] = eachId; } + Column[] groupingKeyColumns = groupingKeyColumnList.toArray(new Column[groupingKeyColumnList.size()]); if (groupbyNode.hasAggFunctions()) { aggFunctions = groupbyNode.getAggFunctions(); @@ -378,7 +385,9 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { aggFunction.bind(context.getEvalContext(), schema); } - tupleSize = groupingKeyIds.length + aggFunctionsNum; + tupleSize = groupingKeyColumns.length + aggFunctionsNum; + aggregatedTuple = new VTuple(groupingKeyColumns.length + aggFunctionsNum); + innerKeyProjector = new KeyProjector(inSchema, groupingKeyColumns); } public int getTupleSize() { @@ -386,22 +395,16 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { } public void compute(Tuple tuple) throws IOException { - Tuple outerKeyTuple = new VTuple(distinctGroupingKeyIds.length); - for (int i = 0; i < distinctGroupingKeyIds.length; i++) { - outerKeyTuple.put(i, tuple.asDatum(distinctGroupingKeyIds[i])); - } + KeyTuple outerKeyTuple = outerKeyProjector.project(tuple); + TupleMap<FunctionContext[]> distinctEntry = hashTable.get(outerKeyTuple); - Tuple keyTuple = new VTuple(groupingKeyIds.length); - for (int i = 0; i < groupingKeyIds.length; i++) { - keyTuple.put(i, tuple.asDatum(groupingKeyIds[i])); - } - - Map<Tuple, FunctionContext[]> distinctEntry = hashTable.get(outerKeyTuple); if (distinctEntry == null) { - distinctEntry = new HashMap<Tuple, FunctionContext[]>(); + distinctEntry = new TupleMap<FunctionContext[]>(); hashTable.put(outerKeyTuple, distinctEntry); } - FunctionContext[] contexts = distinctEntry.get(keyTuple); + + KeyTuple innerKeyTuple = innerKeyProjector.project(tuple); + FunctionContext[] contexts = distinctEntry.get(innerKeyTuple); if (contexts != null) { for (int i = 0; i < aggFunctions.length; i++) { aggFunctions[i].merge(contexts[i], tuple); @@ -412,7 +415,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { contexts[i] = aggFunctions[i].newContext(); aggFunctions[i].merge(contexts[i], tuple); } - distinctEntry.put(keyTuple, contexts); + distinctEntry.put(innerKeyTuple, contexts); } } @@ -420,27 +423,29 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec { iterator = hashTable.entrySet().iterator(); } - public List<Tuple> aggregate(Map<Tuple, FunctionContext[]> groupTuples) { - List<Tuple> aggregatedTuples = new ArrayList<Tuple>(); + public TupleList aggregate(Map<KeyTuple, FunctionContext[]> groupTuples) { + TupleList aggregatedTuples = new TupleList(); - for (Entry<Tuple, FunctionContext[]> entry : groupTuples.entrySet()) { - Tuple tuple = new VTuple(groupingKeyIds.length + aggFunctionsNum); + for (Entry<KeyTuple, FunctionContext[]> entry : groupTuples.entrySet()) { Tuple groupbyKey = entry.getKey(); int index = 0; for (; index < groupbyKey.size(); index++) { - tuple.put(index, groupbyKey.asDatum(index)); + aggregatedTuple.put(index, groupbyKey.asDatum(index)); } FunctionContext[] contexts = entry.getValue(); for (int i = 0; i < aggFunctionsNum; i++, index++) { - tuple.put(index, aggFunctions[i].terminate(contexts[i])); + aggregatedTuple.put(index, aggFunctions[i].terminate(contexts[i])); } - aggregatedTuples.add(tuple); + aggregatedTuples.add(aggregatedTuple); } return aggregatedTuples; } public void close() throws IOException { + for (TupleMap<FunctionContext[]> map : hashTable.values()) { + map.clear(); + } hashTable.clear(); hashTable = null; iterator = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java index 5a262a6..b3edab6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java @@ -1,4 +1,4 @@ - /** +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,22 +18,20 @@ package org.apache.tajo.engine.planner.physical; - import org.apache.commons.logging.Log; - import org.apache.commons.logging.LogFactory; - import org.apache.tajo.catalog.Column; - import org.apache.tajo.plan.expr.AggregationFunctionCallEval; - import org.apache.tajo.plan.function.FunctionContext; - import org.apache.tajo.plan.logical.DistinctGroupbyNode; - import org.apache.tajo.plan.logical.GroupbyNode; - import org.apache.tajo.storage.Tuple; - import org.apache.tajo.storage.VTuple; - import org.apache.tajo.worker.TaskAttemptContext; - - import java.io.IOException; - import java.util.ArrayList; - import java.util.HashSet; - import java.util.List; - import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.plan.expr.AggregationFunctionCallEval; +import org.apache.tajo.plan.function.FunctionContext; +import org.apache.tajo.plan.logical.DistinctGroupbyNode; +import org.apache.tajo.plan.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.*; /** * This class adjusts shuffle columns between DistinctGroupbyFirstAggregationExec and @@ -86,10 +84,21 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { private AggregationFunctionCallEval[] nonDistinctAggrFunctions; private int nonDistinctAggrTupleStartIndex = -1; + // Key tuples may have various lengths. The below two maps are used to cache key tuple instances. + // Each map is a mapping of key length to key tuple. + private Map<Integer, Tuple> keyTupleMap = new HashMap<Integer, Tuple>(); + private Map<Integer, Tuple> prevKeyTupleMap = new HashMap<Integer, Tuple>(); + + private Tuple prevKeyTuple = null; + private Tuple prevTuple = null; + private final Tuple outTuple; + private int prevSeq = -1; + public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); this.plan = plan; + outTuple = new VTuple(outSchema.size()); } @Override @@ -158,20 +167,15 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { } } - Tuple prevKeyTuple = null; - Tuple prevTuple = null; - int prevSeq = -1; - @Override public Tuple next() throws IOException { if (finished) { return null; } - Tuple result = null; while (!context.isStopped()) { - Tuple childTuple = child.next(); - if (childTuple == null) { + Tuple tuple = child.next(); + if (tuple == null) { finished = true; if (prevTuple == null) { @@ -181,15 +185,8 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { if (prevSeq == 0 && nonDistinctAggrFunctions != null) { terminatedNonDistinctAggr(prevTuple); } - result = prevTuple; - break; - } - - Tuple tuple = null; - try { - tuple = childTuple.clone(); - } catch (CloneNotSupportedException e) { - throw new IOException(e.getMessage(), e); + outTuple.put(prevTuple.getValues()); + return outTuple; } int distinctSeq = tuple.getInt2(0); @@ -201,8 +198,8 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { initNonDistinctAggrContext(); mergeNonDistinctAggr(tuple); } - prevKeyTuple = keyTuple; - prevTuple = tuple; + prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues()); + prevTuple = new VTuple(tuple.getValues()); prevSeq = distinctSeq; continue; } @@ -212,20 +209,20 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { if (prevSeq == 0 && nonDistinctAggrFunctions != null) { terminatedNonDistinctAggr(prevTuple); } - result = prevTuple; + outTuple.put(prevTuple.getValues()); - prevKeyTuple = keyTuple; - prevTuple = tuple; + prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues()); + prevTuple.put(tuple.getValues()); prevSeq = distinctSeq; if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { initNonDistinctAggrContext(); mergeNonDistinctAggr(tuple); } - break; + return outTuple; } else { - prevKeyTuple = keyTuple; - prevTuple = tuple; + prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues()); + prevTuple.put(tuple.getValues()); prevSeq = distinctSeq; if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { mergeNonDistinctAggr(tuple); @@ -233,7 +230,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { } } - return result; + return null; } private void initNonDistinctAggrContext() { @@ -265,8 +262,8 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { private Tuple getKeyTuple(int distinctSeq, Tuple tuple) { int[] columnIndexes = distinctKeyIndexes[distinctSeq]; - - Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1); + int keyLength = numGroupingColumns + columnIndexes.length + 1; + Tuple keyTuple = getKeyTuple(keyTupleMap, keyLength); keyTuple.put(0, tuple.asDatum(0)); for (int i = 0; i < numGroupingColumns; i++) { keyTuple.put(i + 1, tuple.asDatum(i + 1)); @@ -278,16 +275,39 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { return keyTuple; } + private static Tuple getKeyTuple(Map<Integer, Tuple> keyTupleMap, Datum[] values) { + Tuple keyTuple = getKeyTuple(keyTupleMap, values.length); + keyTuple.put(values); + return keyTuple; + } + + private static Tuple getKeyTuple(Map<Integer, Tuple> keyTupleMap, int keyLength) { + Tuple keyTuple; + if (keyTupleMap.containsKey(keyLength)) { + keyTuple = keyTupleMap.get(keyLength); + } else { + keyTuple = new VTuple(keyLength); + keyTupleMap.put(keyLength, keyTuple); + } + return keyTuple; + } + @Override public void rescan() throws IOException { super.rescan(); prevKeyTuple = null; prevTuple = null; finished = false; + keyTupleMap.clear(); + prevKeyTupleMap.clear(); } @Override public void close() throws IOException { super.close(); + keyTupleMap.clear(); + prevKeyTupleMap.clear(); + prevKeyTuple = null; + prevTuple = null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java index c91dcca..58cfca4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java @@ -22,7 +22,6 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.logical.DistinctGroupbyNode; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.storage.Tuple; @@ -42,6 +41,8 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec { private int[] resultColumnIdIndexes; + private final Tuple outTuple; + public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, DistinctGroupbyNode plan, SortAggregateExec[] aggregateExecs) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema()); @@ -50,6 +51,7 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec { currentTuples = new Tuple[groupbyNodeNum]; outColumnNum = outSchema.size(); + outTuple = new VTuple(outColumnNum); int allGroupbyOutColNum = 0; for (GroupbyNode eachGroupby: plan.getSubPlans()) { @@ -110,23 +112,20 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec { return null; } - Tuple mergedTuple = new VTuple(outColumnNum); - int mergeTupleIndex = 0; for (int i = 0; i < currentTuples.length; i++) { int tupleSize = currentTuples[i].size(); for (int j = 0; j < tupleSize; j++) { if (resultColumnIdIndexes[mergeTupleIndex] >= 0) { - mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].asDatum(j)); + outTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].asDatum(j)); } mergeTupleIndex++; } } - return mergedTuple; + return outTuple; } private Tuple getEmptyTuple() { - Tuple tuple = new VTuple(outSchema.size()); NullDatum nullDatum = DatumFactory.createNullDatum(); int tupleIndex = 0; @@ -134,23 +133,23 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec { for (int i = 0; i < aggExec.aggFunctionsNum; i++, tupleIndex++) { String funcName = aggExec.aggFunctions[i].getName(); if ("min".equals(funcName) || "max".equals(funcName) || "avg".equals(funcName) || "sum".equals(funcName)) { - tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum()); + outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum()); } else { TajoDataTypes.Type type = outSchema.getColumn(resultColumnIdIndexes[tupleIndex]).getDataType().getType(); if (type == TajoDataTypes.Type.INT8) { - tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt8(nullDatum.asInt8())); + outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt8(nullDatum.asInt8())); } else if (type == TajoDataTypes.Type.INT4) { - tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt4(nullDatum.asInt4())); + outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt4(nullDatum.asInt4())); } else if (type == TajoDataTypes.Type.INT2) { - tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt2(nullDatum.asInt2())); + outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt2(nullDatum.asInt2())); } else if (type == TajoDataTypes.Type.FLOAT4) { - tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat4(nullDatum.asFloat4())); + outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat4(nullDatum.asFloat4())); } else if (type == TajoDataTypes.Type.FLOAT8) { - tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat8(nullDatum.asFloat8())); + outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat8(nullDatum.asFloat8())); } else { - tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum()); + outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum()); } } } @@ -159,7 +158,7 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec { finished = true; first = false; - return tuple; + return outTuple; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java index 5791230..9e9e9b4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java @@ -51,6 +51,11 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { private int[] resultTupleIndexes; + private Tuple outTuple; + private Tuple keyTuple; + private Tuple prevKeyTuple = null; + private Tuple prevTuple = null; + public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); @@ -63,6 +68,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { numGroupingColumns = plan.getGroupingColumns().length; resultTupleLength = numGroupingColumns; + keyTuple = new VTuple(numGroupingColumns); List<GroupbyNode> groupbyNodes = plan.getSubPlans(); @@ -86,6 +92,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { resultTupleLength += eachGroupby.getAggFunctions().length; } aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{}); + outTuple = new VTuple(resultTupleLength); // make output schema mapping index resultTupleIndexes = new int[outSchema.size()]; @@ -128,21 +135,16 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { } } - Tuple prevKeyTuple = null; - Tuple prevTuple = null; - @Override public Tuple next() throws IOException { if (finished) { return null; } - Tuple resultTuple = new VTuple(resultTupleLength); - while (!context.isStopped()) { - Tuple childTuple = child.next(); + Tuple tuple = child.next(); // Last tuple - if (childTuple == null) { + if (tuple == null) { finished = true; if (prevTuple == null) { @@ -156,19 +158,13 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { } for (int i = 0; i < numGroupingColumns; i++) { - resultTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1)); + outTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1)); } for (DistinctFinalAggregator eachAggr: aggregators) { - eachAggr.terminate(resultTuple); + eachAggr.terminate(outTuple); } - break; - } - Tuple tuple = null; - try { - tuple = childTuple.clone(); - } catch (CloneNotSupportedException e) { - throw new IOException(e.getMessage(), e); + return outTuple; } int distinctSeq = tuple.getInt2(0); @@ -176,8 +172,8 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { // First tuple if (prevKeyTuple == null) { - prevKeyTuple = keyTuple; - prevTuple = tuple; + prevKeyTuple = new VTuple(keyTuple.getValues()); + prevTuple = new VTuple(tuple.getValues()); aggregators[distinctSeq].merge(tuple); continue; @@ -186,38 +182,36 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { if (!prevKeyTuple.equals(keyTuple)) { // new grouping key for (int i = 0; i < numGroupingColumns; i++) { - resultTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1)); + outTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1)); } for (DistinctFinalAggregator eachAggr: aggregators) { - eachAggr.terminate(resultTuple); + eachAggr.terminate(outTuple); } - prevKeyTuple = keyTuple; - prevTuple = tuple; + prevKeyTuple.put(keyTuple.getValues()); + prevTuple.put(tuple.getValues()); aggregators[distinctSeq].merge(tuple); - break; + return outTuple; } else { - prevKeyTuple = keyTuple; - prevTuple = tuple; + prevKeyTuple.put(keyTuple.getValues()); + prevTuple.put(tuple.getValues()); aggregators[distinctSeq].merge(tuple); } } - return resultTuple; + return null; } private Tuple makeEmptyTuple() { - Tuple resultTuple = new VTuple(resultTupleLength); for (DistinctFinalAggregator eachAggr: aggregators) { - eachAggr.terminateEmpty(resultTuple); + eachAggr.terminateEmpty(outTuple); } - return resultTuple; + return outTuple; } private Tuple getGroupingKeyTuple(Tuple tuple) { - Tuple keyTuple = new VTuple(numGroupingColumns); for (int i = 0; i < numGroupingColumns; i++) { keyTuple.put(i, tuple.asDatum(i + 1)); }
