http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 13b5a3a..adbafd9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -19,7 +19,6 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.annotations.VisibleForTesting; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.LocalDirAllocator; @@ -31,13 +30,11 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.PhysicalPlanningException; import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.storage.*; -import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.unit.StorageUnit; @@ -47,7 +44,9 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.*; import static org.apache.tajo.storage.RawFile.RawFileAppender; @@ -81,7 +80,7 @@ public class ExternalSortExec extends SortExec { /** If there are available multiple cores, it tries parallel merge. */ private ExecutorService executorService; /** used for in-memory sort of each chunk. */ - private List<Tuple> inMemoryTable; + private TupleList inMemoryTable; /** temporal dir */ private final Path sortTmpDir; /** It enables round-robin disks allocation */ @@ -120,7 +119,7 @@ public class ExternalSortExec extends SortExec { this.sortBufferBytesNum = context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB; this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM); this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum); - this.inMemoryTable = new ArrayList<Tuple>(100000); + this.inMemoryTable = new TupleList(100000); this.sortTmpDir = getExecutorTmpDir(); localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); @@ -161,7 +160,7 @@ public class ExternalSortExec extends SortExec { /** * Sort a tuple block and store them into a chunk file */ - private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock) + private Path sortAndStoreChunk(int chunkId, TupleList tupleBlock) throws IOException { TableMeta meta = CatalogUtil.newTableMeta("RAW"); int rowNum = tupleBlock.size(); @@ -203,9 +202,8 @@ public class ExternalSortExec extends SortExec { int chunkId = 0; long runStartTime = System.currentTimeMillis(); while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start - Tuple vtuple = new VTuple(tuple); - inMemoryTable.add(vtuple); - memoryConsumption += MemoryUtil.calculateMemorySize(vtuple); + inMemoryTable.add(tuple); + memoryConsumption += MemoryUtil.calculateMemorySize(tuple); if (memoryConsumption > sortBufferBytesNum) { long runEndTime = System.currentTimeMillis(); @@ -645,6 +643,8 @@ public class ExternalSortExec extends SortExec { private Tuple leftTuple; private Tuple rightTuple; + private final Tuple outTuple; + private float mergerProgress; private TableStats mergerInputStats; @@ -656,6 +656,7 @@ public class ExternalSortExec extends SortExec { this.leftScan = leftScanner; this.rightScan = rightScanner; this.comparator = comparator; + this.outTuple = new VTuple(schema.size()); } private void setState(State state) { @@ -685,25 +686,26 @@ public class ExternalSortExec extends SortExec { } protected Tuple prepare(int index, Tuple tuple) { - return tuple == null ? null : new VTuple(tuple); + return tuple; } protected int compare() { return comparator.compare(leftTuple, rightTuple); } + @Override public Tuple next() throws IOException { if (leftTuple == null && rightTuple == null) { return null; } if (rightTuple == null || (leftTuple != null && compare() < 0)) { - Tuple tuple = leftTuple; + outTuple.put(leftTuple.getValues()); leftTuple = prepare(0, leftScan.next()); - return tuple; + return outTuple; } - Tuple tuple = rightTuple; + outTuple.put(rightTuple.getValues()); rightTuple = prepare(1, rightScan.next()); - return tuple; + return outTuple; } @Override @@ -726,6 +728,8 @@ public class ExternalSortExec extends SortExec { IOUtils.cleanup(LOG, leftScan, rightScan); getInputStats(); mergerProgress = 1.0f; + leftTuple = null; + rightTuple = null; setState(State.CLOSED); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java index e6d1a96..b657622 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.storage.Tuple; @@ -25,9 +26,7 @@ import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Map.Entry; /** @@ -35,25 +34,23 @@ import java.util.Map.Entry; */ public class HashAggregateExec extends AggregationExec { private Tuple tuple = null; - private Map<Tuple, FunctionContext[]> hashTable; + private TupleMap<FunctionContext[]> hashTable; + private KeyProjector hashKeyProjector; private boolean computed = false; - private Iterator<Entry<Tuple, FunctionContext []>> iterator = null; + private Iterator<Entry<KeyTuple, FunctionContext []>> iterator = null; public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException { super(ctx, plan, subOp); - hashTable = new HashMap<Tuple, FunctionContext []>(100000); + hashKeyProjector = new KeyProjector(inSchema, plan.getGroupingColumns()); + hashTable = new TupleMap<FunctionContext []>(10000); this.tuple = new VTuple(plan.getOutSchema().size()); } private void compute() throws IOException { Tuple tuple; - Tuple keyTuple; + KeyTuple keyTuple; while(!context.isStopped() && (tuple = child.next()) != null) { - keyTuple = new VTuple(groupingKeyIds.length); - // build one key tuple - for(int i = 0; i < groupingKeyIds.length; i++) { - keyTuple.put(i, tuple.asDatum(groupingKeyIds[i])); - } + keyTuple = hashKeyProjector.project(tuple); FunctionContext [] contexts = hashTable.get(keyTuple); if(contexts != null) { @@ -92,7 +89,7 @@ public class HashAggregateExec extends AggregationExec { FunctionContext [] contexts; if (iterator.hasNext()) { - Entry<Tuple, FunctionContext []> entry = iterator.next(); + Entry<KeyTuple, FunctionContext []> entry = iterator.next(); Tuple keyTuple = entry.getKey(); contexts = entry.getValue(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index 1645263..c0a8622 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -25,22 +25,26 @@ import org.apache.tajo.util.Pair; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.*; +import java.util.Iterator; +import java.util.List; +import java.util.Map; -public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List<Tuple>>> { +public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, TupleList>> { private boolean finalLoop; // final loop for right unmatched + private final List<Tuple> nullTupleList; public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { super(context, plan, outer, inner); + nullTupleList = nullTupleList(rightNumCols); } public Iterator<Tuple> getUnmatchedRight() { return new Iterator<Tuple>() { - private Iterator<Pair<Boolean, List<Tuple>>> iterator1 = tupleSlots.values().iterator(); + private Iterator<Pair<Boolean, TupleList>> iterator1 = tupleSlots.values().iterator(); private Iterator<Tuple> iterator2; @Override @@ -49,7 +53,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List return true; } for (iterator2 = null; !hasMore() && iterator1.hasNext();) { - Pair<Boolean, List<Tuple>> next = iterator1.next(); + Pair<Boolean, TupleList> next = iterator1.next(); if (!next.getFirst()) { iterator2 = next.getSecond().iterator(); } @@ -81,8 +85,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List while (!context.isStopped() && !finished) { if (iterator != null && iterator.hasNext()) { frameTuple.setRight(iterator.next()); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } if (finalLoop) { finished = true; @@ -100,18 +103,18 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List frameTuple.setLeft(leftTuple); if (leftFiltered(leftTuple)) { - iterator = nullIterator(rightNumCols); + iterator = nullTupleList.iterator(); continue; } // getting corresponding right - Pair<Boolean, List<Tuple>> hashed = tupleSlots.get(toKey(leftTuple)); + Pair<Boolean, TupleList> hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); if (hashed == null) { - iterator = nullIterator(rightNumCols); + iterator = nullTupleList.iterator(); continue; } Iterator<Tuple> rightTuples = rightFiltered(hashed.getSecond()); if (!rightTuples.hasNext()) { - iterator = nullIterator(rightNumCols); + iterator = nullTupleList.iterator(); continue; } iterator = rightTuples; @@ -122,12 +125,12 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List } @Override - protected Map<Tuple, Pair<Boolean, List<Tuple>>> convert(Map<Tuple, List<Tuple>> hashed, - boolean fromCache) throws IOException { - Map<Tuple, Pair<Boolean, List<Tuple>>> tuples = new HashMap<Tuple, Pair<Boolean, List<Tuple>>>(hashed.size()); - for (Map.Entry<Tuple, List<Tuple>> entry : hashed.entrySet()) { + protected TupleMap<Pair<Boolean, TupleList>> convert(TupleMap<TupleList> hashed, + boolean fromCache) throws IOException { + TupleMap<Pair<Boolean, TupleList>> tuples = new TupleMap<Pair<Boolean, TupleList>>(hashed.size()); + for (Map.Entry<KeyTuple, TupleList> entry : hashed.entrySet()) { // flag: initially false (whether this join key had at least one match on the counter part) - tuples.put(entry.getKey(), new Pair<Boolean, List<Tuple>>(false, entry.getValue())); + tuples.putWihtoutKeyCopy(entry.getKey(), new Pair<Boolean, TupleList>(false, entry.getValue())); } return tuples; } @@ -135,7 +138,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List @Override public void rescan() throws IOException { super.rescan(); - for (Pair<Boolean, List<Tuple>> value : tupleSlots.values()) { + for (Pair<Boolean, TupleList> value : tupleSlots.values()) { value.setFirst(false); } finalLoop = false; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index 3065c15..bd817bb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -23,12 +23,9 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; -import java.util.Map; -public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> { +public class HashJoinExec extends CommonHashJoinExec<TupleList> { public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, PhysicalExec rightExec) { @@ -36,9 +33,9 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> { } @Override - protected Map<Tuple, List<Tuple>> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache) + protected TupleMap<TupleList> convert(TupleMap<TupleList> hashed, boolean fromCache) throws IOException { - return fromCache ? new HashMap<Tuple, List<Tuple>>(hashed) : hashed; + return fromCache ? new TupleMap<TupleList>(hashed) : hashed; } @Override @@ -50,8 +47,7 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> { while (!context.isStopped() && !finished) { if (iterator != null && iterator.hasNext()) { frameTuple.setRight(iterator.next()); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } Tuple leftTuple = leftChild.next(); // it comes from a disk @@ -63,7 +59,7 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> { frameTuple.setLeft(leftTuple); // getting corresponding right - Iterable<Tuple> hashed = getRights(toKey(leftTuple)); + Iterable<Tuple> hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); Iterator<Tuple> rightTuples = rightFiltered(hashed); if (rightTuples.hasNext()) { iterator = rightTuples; @@ -72,9 +68,4 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> { return null; } - - private Iterable<Tuple> getRights(Tuple key) { - return tupleSlots.get(key); - } - } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java index 8239270..746bdb9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java @@ -18,9 +18,9 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.List; @@ -32,6 +32,8 @@ import java.util.List; */ public class HashLeftAntiJoinExec extends HashJoinExec { + private final List<Tuple> nullTupleList = nullTupleList(0); + public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild, PhysicalExec notInSideChild) { super(context, plan, fromSideChild, notInSideChild); @@ -57,8 +59,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec { while(!context.isStopped() && !finished) { if (iterator != null && iterator.hasNext()) { frameTuple.setRight(iterator.next()); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } // getting new outer Tuple leftTuple = leftChild.next(); // it comes from a disk @@ -70,9 +71,9 @@ public class HashLeftAntiJoinExec extends HashJoinExec { frameTuple.setLeft(leftTuple); // Try to find a hash bucket in in-memory hash table - List<Tuple> hashed = tupleSlots.get(toKey(leftTuple)); + TupleList hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); if (hashed == null || !rightFiltered(hashed).hasNext()) { - iterator = nullIterator(0); + iterator = nullTupleList.iterator(); } } return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index 27f683b..b652c3c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -31,10 +31,12 @@ import java.util.List; public class HashLeftOuterJoinExec extends HashJoinExec { private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class); + private final List<Tuple> nullTupleList; public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, PhysicalExec rightChild) { super(context, plan, leftChild, rightChild); + nullTupleList = nullTupleList(rightNumCols); } @Override @@ -46,8 +48,7 @@ public class HashLeftOuterJoinExec extends HashJoinExec { while (!context.isStopped() && !finished) { if (iterator != null && iterator.hasNext()) { frameTuple.setRight(iterator.next()); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } Tuple leftTuple = leftChild.next(); // it comes from a disk if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed. @@ -57,17 +58,17 @@ public class HashLeftOuterJoinExec extends HashJoinExec { frameTuple.setLeft(leftTuple); if (leftFiltered(leftTuple)) { - iterator = nullIterator(rightNumCols); + iterator = nullTupleList.iterator(); continue; } // getting corresponding right - List<Tuple> hashed = tupleSlots.get(toKey(leftTuple)); + TupleList hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); Iterator<Tuple> rightTuples = rightFiltered(hashed); if (!rightTuples.hasNext()) { //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway //output a tuple with the nulls padded rightTuple - iterator = nullIterator(rightNumCols); + iterator = nullTupleList.iterator(); continue; } iterator = rightTuples; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java index 41e842a..42b78e8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java @@ -18,9 +18,9 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.List; @@ -32,6 +32,8 @@ import java.util.List; */ public class HashLeftSemiJoinExec extends HashJoinExec { + private final List<Tuple> nullTupleList = nullTupleList(0); + public HashLeftSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild, PhysicalExec inSideChild) { super(context, plan, fromSideChild, inSideChild); @@ -59,8 +61,7 @@ public class HashLeftSemiJoinExec extends HashJoinExec { while(!context.isStopped() && !finished) { if (iterator != null && iterator.hasNext()) { frameTuple.setRight(iterator.next()); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } // getting new outer Tuple leftTuple = leftChild.next(); // it comes from a disk @@ -72,10 +73,10 @@ public class HashLeftSemiJoinExec extends HashJoinExec { frameTuple.setLeft(leftTuple); // Try to find a hash bucket in in-memory hash table - List<Tuple> hashed = tupleSlots.get(toKey(leftTuple)); + TupleList hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple)); if (hashed != null && rightFiltered(hashed).hasNext()) { // if found, it gets a hash bucket from the hash table. - iterator = nullIterator(0); + iterator = nullTupleList.iterator(); } } return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 1a92a7a..bc4382e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -33,9 +33,7 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -47,7 +45,6 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { private ShuffleFileWriteNode plan; private final TableMeta meta; private Partitioner partitioner; -// private final Path storeTablePath; private Map<Integer, HashShuffleAppender> appenderMap = new HashMap<Integer, HashShuffleAppender>(); private final int numShuffleOutputs; private final int [] shuffleKeyIds; @@ -92,8 +89,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { return appender; } -// Map<Integer, Long> partitionStats = new HashMap<Integer, Long>(); - Map<Integer, List<Tuple>> partitionTuples = new HashMap<Integer, List<Tuple>>(); + Map<Integer, TupleList> partitionTuples = new HashMap<Integer, TupleList>(); long writtenBytes = 0L; @Override @@ -108,17 +104,14 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { numRows++; partId = partitioner.getPartition(tuple); - List<Tuple> partitionTupleList = partitionTuples.get(partId); + TupleList partitionTupleList = partitionTuples.get(partId); if (partitionTupleList == null) { - partitionTupleList = new ArrayList<Tuple>(1000); + partitionTupleList = new TupleList(1000); partitionTuples.put(partId, partitionTupleList); } - try { - partitionTupleList.add(tuple.clone()); - } catch (CloneNotSupportedException e) { - } + partitionTupleList.add(tuple); if (tupleCount >= numHashShuffleBufferTuples) { - for (Map.Entry<Integer, List<Tuple>> entry : partitionTuples.entrySet()) { + for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) { int appendPartId = entry.getKey(); HashShuffleAppender appender = getAppender(appendPartId); int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); @@ -130,7 +123,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { } // processing remained tuples - for (Map.Entry<Integer, List<Tuple>> entry : partitionTuples.entrySet()) { + for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) { int appendPartId = entry.getKey(); HashShuffleAppender appender = getAppender(appendPartId); int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); @@ -168,6 +161,12 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { appenderMap = null; } + for (TupleList eachList : partitionTuples.values()) { + eachList.clear(); + } + partitionTuples.clear(); + partitionTuples = null; + partitioner = null; plan = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java new file mode 100644 index 0000000..39b13f8 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; + +/** + * KeyTuple is to keep its hash value in memory to avoid frequent expensive hash calculation. + * Datum.hashCode() uses MurmurHash, so its cost is not so cheap. + * + */ +public class KeyTuple extends VTuple { + private int hashCode; + + public KeyTuple(int size) { + super(size); + updateHashCode(); + } + + public KeyTuple(Tuple tuple) { + super(tuple); + updateHashCode(); + } + + public KeyTuple(Datum[] datums) { + super(datums); + updateHashCode(); + } + + @Override + public void put(int fieldId, Tuple tuple) { + super.put(fieldId, tuple); + updateHashCode(); + } + + private void updateHashCode() { + this.hashCode = super.hashCode(); + } + + @Override + public void put(int fieldId, Datum value) { + super.put(fieldId, value); + updateHashCode(); + } + + @Override + public void clear() { + super.clear(); + updateHashCode(); + } + + @Override + public void put(Datum [] values) { + super.put(values); + updateHashCode(); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java index f76e356..029592a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java @@ -18,19 +18,16 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; public class MemSortExec extends SortExec { private SortNode plan; - private List<Tuple> tupleSlots; + private TupleList tupleSlots; private boolean sorted = false; private Iterator<Tuple> iterator; @@ -42,7 +39,7 @@ public class MemSortExec extends SortExec { public void init() throws IOException { super.init(); - this.tupleSlots = new ArrayList<Tuple>(10000); + this.tupleSlots = new TupleList(10000); } @Override @@ -51,7 +48,7 @@ public class MemSortExec extends SortExec { if (!sorted) { Tuple tuple; while (!context.isStopped() && (tuple = child.next()) != null) { - tupleSlots.add(new VTuple(tuple)); + tupleSlots.add(tuple); } iterator = getSorter(tupleSlots).sort().iterator(); sorted = true; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java index 13b73c3..41e3648 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java @@ -20,30 +20,27 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; +import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; public class MergeFullOuterJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join - private FrameTuple frameTuple; private Tuple leftTuple = null; private Tuple rightTuple = null; - private Tuple outTuple = null; private Tuple leftNext = null; + private Tuple prevLeftTuple = null; + private Tuple prevRightTuple = null; - private List<Tuple> leftTupleSlots; - private List<Tuple> rightTupleSlots; + private TupleList leftTupleSlots; + private TupleList rightTupleSlots; private JoinTupleComparator joincomparator = null; private TupleComparator[] tupleComparator = null; @@ -59,13 +56,16 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { boolean endInPopulationStage = false; private boolean initRightDone = false; + private final Tuple leftNullTuple; + private final Tuple rightNullTuple; + public MergeFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) { super(context, plan, leftChild, rightChild); Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " + "but there is no join condition"); - this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); - this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); + this.leftTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); + this.rightTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); SortSpec[][] sortSpecs = new SortSpec[2][]; sortSpecs[0] = leftSortKey; sortSpecs[1] = rightSortKey; @@ -75,16 +75,18 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual( plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema()); - // for join - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); - leftNumCols = leftChild.getSchema().size(); rightNumCols = rightChild.getSchema().size(); + + prevLeftTuple = new VTuple(leftChild.getSchema().size()); + prevRightTuple = new VTuple(rightChild.getSchema().size()); + + leftNullTuple = NullTuple.create(leftNumCols); + rightNullTuple = NullTuple.create(rightNumCols); } public Tuple next() throws IOException { - Tuple previous; + Tuple outTuple; while (!context.isStopped()) { boolean newRound = false; @@ -122,9 +124,8 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { if((leftTuple == null) && (rightTuple != null)){ // output a tuple with the nulls padded leftTuple - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols); - frameTuple.set(nullPaddedTuple, rightTuple); - projector.eval(frameTuple, outTuple); + frameTuple.set(leftNullTuple, rightTuple); + outTuple = projector.eval(frameTuple); // we simulate we found a match, which is exactly the null padded one rightTuple = rightChild.next(); return outTuple; @@ -132,9 +133,8 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { if((leftTuple != null) && (rightTuple == null)){ // output a tuple with the nulls padded leftTuple - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols); - frameTuple.set(leftTuple, nullPaddedTuple); - projector.eval(frameTuple, outTuple); + frameTuple.set(leftTuple, rightNullTuple); + outTuple = projector.eval(frameTuple); // we simulate we found a match, which is exactly the null padded one leftTuple = leftChild.next(); return outTuple; @@ -178,9 +178,9 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { //before getting a new tuple from the right, a leftnullpadded tuple should be built //output a tuple with the nulls padded leftTuple - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols); + Tuple nullPaddedTuple = leftNullTuple; frameTuple.set(nullPaddedTuple, rightTuple); - projector.eval(frameTuple, outTuple); + outTuple = projector.eval(frameTuple); // BEFORE RETURN, MOVE FORWARD rightTuple = rightChild.next(); if(rightTuple == null) { @@ -192,9 +192,9 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { } else if (cmp < 0) { // before getting a new tuple from the left, a rightnullpadded tuple should be built // output a tuple with the nulls padded rightTuple - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols); + Tuple nullPaddedTuple = rightNullTuple; frameTuple.set(leftTuple, nullPaddedTuple); - projector.eval(frameTuple, outTuple); + outTuple = projector.eval(frameTuple); // we simulate we found a match, which is exactly the null padded one // BEFORE RETURN, MOVE FORWARD leftTuple = leftChild.next(); @@ -219,28 +219,27 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { boolean endLeft = false; boolean endRight = false; - previous = new VTuple(leftTuple); + prevLeftTuple.put(leftTuple.getValues()); do { - leftTupleSlots.add(new VTuple(leftTuple)); + leftTupleSlots.add(leftTuple); leftTuple = leftChild.next(); if(leftTuple == null) { endLeft = true; } - } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0)); + } while ((endLeft != true) && (tupleComparator[0].compare(prevLeftTuple, leftTuple) == 0)); posLeftTupleSlots = 0; - - previous = new VTuple(rightTuple); + prevRightTuple.put(rightTuple.getValues()); do { - rightTupleSlots.add(new VTuple(rightTuple)); + rightTupleSlots.add(rightTuple); rightTuple = rightChild.next(); if(rightTuple == null) { endRight = true; } - } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) ); + } while ((endRight != true) && (tupleComparator[1].compare(prevRightTuple, rightTuple) == 0) ); posRightTupleSlots = 0; if ((endLeft == true) || (endRight == true)) { @@ -261,31 +260,29 @@ public class MergeFullOuterJoinExec extends CommonJoinExec { // (i.e. refers to next round) if(!end || (end && endInPopulationStage)){ if(posLeftTupleSlots == 0){ - leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots)); + leftNext = leftTupleSlots.get(posLeftTupleSlots); posLeftTupleSlots = posLeftTupleSlots + 1; } if(posRightTupleSlots <= (rightTupleSlots.size() -1)) { - Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots)); + Tuple aTuple = rightTupleSlots.get(posRightTupleSlots); posRightTupleSlots = posRightTupleSlots + 1; frameTuple.set(leftNext, aTuple); joinQual.eval(frameTuple); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } else { // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) { //rewind the right slots position posRightTupleSlots = 0; - Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots)); + Tuple aTuple = rightTupleSlots.get(posRightTupleSlots); posRightTupleSlots = posRightTupleSlots + 1; - leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots)); + leftNext = leftTupleSlots.get(posLeftTupleSlots); posLeftTupleSlots = posLeftTupleSlots + 1; frameTuple.set(leftNext, aTuple); joinQual.eval(frameTuple); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } } } // the second if end false http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index bf9b4cd..3d8c108 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -21,28 +21,25 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; public class MergeJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join - private FrameTuple frameTuple; private Tuple outerTuple = null; private Tuple innerTuple = null; - private Tuple outTuple = null; private Tuple outerNext = null; + private final Tuple prevOuterTuple; + private final Tuple prevInnerTuple; - private List<Tuple> outerTupleSlots; - private List<Tuple> innerTupleSlots; + private TupleList outerTupleSlots; + private TupleList innerTupleSlots; private Iterator<Tuple> outerIterator; private Iterator<Tuple> innerIterator; @@ -59,8 +56,8 @@ public class MergeJoinExec extends CommonJoinExec { Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " + "but there is no join condition"); - this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); - this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); + this.outerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); + this.innerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); SortSpec[][] sortSpecs = new SortSpec[2][]; sortSpecs[0] = outerSortKey; sortSpecs[1] = innerSortKey; @@ -71,14 +68,12 @@ public class MergeJoinExec extends CommonJoinExec { plan.getJoinQual(), outer.getSchema(), inner.getSchema()); this.outerIterator = outerTupleSlots.iterator(); this.innerIterator = innerTupleSlots.iterator(); - - // for join - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); + + prevOuterTuple = new VTuple(leftChild.getSchema().size()); + prevInnerTuple = new VTuple(rightChild.getSchema().size()); } public Tuple next() throws IOException { - Tuple previous; while (!context.isStopped()) { if (!outerIterator.hasNext() && !innerIterator.hasNext()) { @@ -108,32 +103,28 @@ public class MergeJoinExec extends CommonJoinExec { } } - try { - previous = outerTuple.clone(); - do { - outerTupleSlots.add(outerTuple.clone()); - outerTuple = leftChild.next(); - if (outerTuple == null) { - end = true; - break; - } - } while (tupleComparator[0].compare(previous, outerTuple) == 0); - outerIterator = outerTupleSlots.iterator(); - outerNext = outerIterator.next(); - - previous = innerTuple.clone(); - do { - innerTupleSlots.add(innerTuple.clone()); - innerTuple = rightChild.next(); - if (innerTuple == null) { - end = true; - break; - } - } while (tupleComparator[1].compare(previous, innerTuple) == 0); - innerIterator = innerTupleSlots.iterator(); - } catch (CloneNotSupportedException e) { + prevOuterTuple.put(outerTuple.getValues()); + do { + outerTupleSlots.add(outerTuple); + outerTuple = leftChild.next(); + if (outerTuple == null) { + end = true; + break; + } + } while (tupleComparator[0].compare(prevOuterTuple, outerTuple) == 0); + outerIterator = outerTupleSlots.iterator(); + outerNext = outerIterator.next(); - } + prevInnerTuple.put(innerTuple.getValues()); + do { + innerTupleSlots.add(innerTuple); + innerTuple = rightChild.next(); + if (innerTuple == null) { + end = true; + break; + } + } while (tupleComparator[1].compare(prevInnerTuple, innerTuple) == 0); + innerIterator = innerTupleSlots.iterator(); } if(!innerIterator.hasNext()){ @@ -144,8 +135,7 @@ public class MergeJoinExec extends CommonJoinExec { frameTuple.set(outerNext, innerIterator.next()); if (joinQual.eval(frameTuple).isTrue()) { - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } } return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java index 964a523..d3214c3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java @@ -19,9 +19,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -30,18 +28,14 @@ public class NLJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join private boolean needNewOuter; - private FrameTuple frameTuple; private Tuple outerTuple = null; private Tuple innerTuple = null; - private Tuple outTuple = null; public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { super(context, plan, outer, inner); // for join needNewOuter = true; - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); } public Tuple next() throws IOException { @@ -64,12 +58,10 @@ public class NLJoinExec extends CommonJoinExec { frameTuple.set(outerTuple, innerTuple); if (hasJoinQual) { if (joinQual.eval(frameTuple).isTrue()) { - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } } else { - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } } return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java index 72a667d..8a79005 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java @@ -24,7 +24,6 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.plan.logical.Projectable; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -33,7 +32,6 @@ public class ProjectionExec extends UnaryPhysicalExec { private Projectable plan; // for projection - private Tuple outTuple; private Projector projector; public ProjectionExec(TaskAttemptContext context, Projectable plan, @@ -45,7 +43,6 @@ public class ProjectionExec extends UnaryPhysicalExec { public void init() throws IOException { super.init(); - this.outTuple = new VTuple(outSchema.size()); this.projector = new Projector(context, inSchema, outSchema, this.plan.getTargets()); } @@ -57,8 +54,7 @@ public class ProjectionExec extends UnaryPhysicalExec { return null; } - projector.eval(tuple, outTuple); - return outTuple; + return projector.eval(tuple); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index bbb21fe..ac3d1b2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -24,8 +24,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.IOUtils; -import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.index.bst.BSTIndex; @@ -40,9 +44,8 @@ import java.io.IOException; * specified order of shuffle keys. */ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { - private static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class); + private final static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class); private final SortSpec[] sortSpecs; - private int [] indexKeys = null; private Schema keySchema; private BSTIndex.BSTIndexWriter indexWriter; @@ -50,6 +53,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { private FileAppender appender; private TableMeta meta; + private KeyProjector keyProjector; + public RangeShuffleFileWriteExec(final TaskAttemptContext context, final PhysicalExec child, final Schema inSchema, final Schema outSchema, final SortSpec[] sortSpecs) throws IOException { @@ -60,14 +65,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { public void init() throws IOException { super.init(); - indexKeys = new int[sortSpecs.length]; keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); - - Column col; - for (int i = 0 ; i < sortSpecs.length; i++) { - col = sortSpecs[i].getSortKey(); - indexKeys[i] = inSchema.getColumnId(col.getQualifiedName()); - } + keyProjector = new KeyProjector(inSchema, keySchema.toArray()); BSTIndex bst = new BSTIndex(new TajoConf()); this.comp = new BaseTupleComparator(keySchema, sortSpecs); @@ -91,24 +90,17 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { public Tuple next() throws IOException { Tuple tuple; Tuple keyTuple; - Tuple prevKeyTuple = null; + Tuple prevKeyTuple = new VTuple(keySchema.size()); long offset; - - try { - while(!context.isStopped() && (tuple = child.next()) != null) { - offset = appender.getOffset(); - appender.addTuple(tuple); - keyTuple = new VTuple(keySchema.size()); - RowStoreUtil.project(tuple, keyTuple, indexKeys); - if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) { - indexWriter.write(keyTuple, offset); - prevKeyTuple = keyTuple; - } + while(!context.isStopped() && (tuple = child.next()) != null) { + offset = appender.getOffset(); + appender.addTuple(tuple); + keyTuple = keyProjector.project(tuple); + if (!prevKeyTuple.equals(keyTuple)) { + indexWriter.write(keyTuple, offset); + prevKeyTuple.put(keyTuple.getValues()); } - } catch (RuntimeException e) { - e.printStackTrace(); - throw e; } return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index fd825b1..239c6ab 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -20,29 +20,26 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; +import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; public class RightOuterMergeJoinExec extends CommonJoinExec { // temporal tuples and states for nested loop join - private FrameTuple frameTuple; private Tuple leftTuple = null; private Tuple rightTuple = null; - private Tuple outTuple = null; + private Tuple prevLeftTuple; + private Tuple prevRightTuple; private Tuple nextLeft = null; + private Tuple nullPaddedTuple; - private List<Tuple> leftTupleSlots; - private List<Tuple> innerTupleSlots; + private TupleList leftTupleSlots; + private TupleList innerTupleSlots; private JoinTupleComparator joinComparator = null; private TupleComparator [] tupleComparator = null; @@ -62,8 +59,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { super(context, plan, outer, inner); Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " + "but there is no join condition"); - this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); - this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT); + this.leftTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); + this.innerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT); SortSpec[][] sortSpecs = new SortSpec[2][]; sortSpecs[0] = outerSortKey; sortSpecs[1] = innerSortKey; @@ -72,22 +69,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual( plan.getJoinQual(), outer.getSchema(), inner.getSchema()); - // for join - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); - leftNumCols = outer.getSchema().size(); - } - - /** - * creates a tuple of a given size filled with NULL values in all fields - */ - private Tuple createNullPaddedTuple(int columnNum){ - VTuple tuple = new VTuple(columnNum); - for (int i = 0; i < columnNum; i++) { - tuple.put(i, DatumFactory.createNullDatum()); - } - return tuple; + nullPaddedTuple = NullTuple.create(leftNumCols); } /** @@ -101,7 +84,9 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { * @return * @throws IOException */ + @Override public Tuple next() throws IOException { + Tuple outTuple; while (!context.isStopped()) { boolean newRound = false; @@ -130,9 +115,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { return null; } else { // output a tuple with the nulls padded leftTuple - Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols); frameTuple.set(nullPaddedTuple, rightTuple); - projector.eval(frameTuple, outTuple); + outTuple = projector.eval(frameTuple); // we simulate we found a match, which is exactly the null padded one rightTuple = rightChild.next(); @@ -168,11 +152,10 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { } } if (rightFiltered(rightTuple)) { - Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols); frameTuple.set(nullPaddedTuple, rightTuple); - projector.eval(frameTuple, outTuple); - + outTuple = projector.eval(frameTuple); rightTuple = null; + return outTuple; } initRightDone = true; @@ -202,9 +185,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { if (cmp > 0) { // before getting a new tuple from the right, a left null padded tuple should be built // output a tuple with the nulls padded left tuple - Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols); frameTuple.set(nullPaddedTuple, rightTuple); - projector.eval(frameTuple, outTuple); + outTuple = projector.eval(frameTuple); // we simulate we found a match, which is exactly the null padded one // BEFORE RETURN, MOVE FORWARD @@ -225,7 +207,6 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { // END MOVE FORWARDING STAGE ////////////////////////////////////////////////////////////////////// - Tuple previous = null; // once a match is found, retain all tuples with this key in tuple slots on each side if(!end) { endInPopulationStage = false; @@ -233,26 +214,33 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { boolean endOuter = false; boolean endInner = false; - previous = new VTuple(leftTuple); + if (prevLeftTuple == null) { + prevLeftTuple = new VTuple(leftTuple.getValues()); + } else { + prevLeftTuple.put(leftTuple.getValues()); + } do { - leftTupleSlots.add(new VTuple(leftTuple)); + leftTupleSlots.add(leftTuple); leftTuple = leftChild.next(); if( leftTuple == null) { endOuter = true; } - } while ((endOuter != true) && (tupleComparator[0].compare(previous, leftTuple) == 0)); + } while ((endOuter != true) && (tupleComparator[0].compare(prevLeftTuple, leftTuple) == 0)); posLeftTupleSlots = 0; - previous = new VTuple(rightTuple); - + if (prevRightTuple == null) { + prevRightTuple = new VTuple(rightTuple.getValues()); + } else { + prevRightTuple.put(rightTuple.getValues()); + } do { - innerTupleSlots.add(new VTuple(rightTuple)); + innerTupleSlots.add(rightTuple); rightTuple = rightChild.next(); if(rightTuple == null) { endInner = true; } - } while ((endInner != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) ); + } while ((endInner != true) && (tupleComparator[1].compare(prevRightTuple, rightTuple) == 0) ); posRightTupleSlots = 0; if ((endOuter == true) || (endInner == true)) { @@ -260,10 +248,9 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { endInPopulationStage = true; } } // if end false - if (previous != null && rightFiltered(previous)) { - Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols); - frameTuple.set(nullPaddedTuple, previous); - projector.eval(frameTuple, outTuple); + if (prevRightTuple != null && rightFiltered(prevRightTuple)) { + frameTuple.set(nullPaddedTuple, prevRightTuple); + outTuple = projector.eval(frameTuple); // reset tuple slots for a new round leftTupleSlots.clear(); @@ -283,48 +270,42 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { if ((end == false) || ((end == true) && (endInPopulationStage == true))){ if(posLeftTupleSlots == 0){ - nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots)); + nextLeft = leftTupleSlots.get(posLeftTupleSlots); posLeftTupleSlots = posLeftTupleSlots + 1; } if(posRightTupleSlots <= (innerTupleSlots.size() -1)) { - Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots)); + Tuple aTuple = innerTupleSlots.get(posRightTupleSlots); posRightTupleSlots = posRightTupleSlots + 1; frameTuple.set(nextLeft, aTuple); if (joinQual.eval(frameTuple).asBool()) { - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } else { // padding null - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols); frameTuple.set(nullPaddedTuple, aTuple); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } } else { // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots if(posLeftTupleSlots <= (leftTupleSlots.size() - 1)) { //rewind the right slots position posRightTupleSlots = 0; - Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots)); + Tuple aTuple = innerTupleSlots.get(posRightTupleSlots); posRightTupleSlots = posRightTupleSlots + 1; - nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots)); + nextLeft = leftTupleSlots.get(posLeftTupleSlots); posLeftTupleSlots = posLeftTupleSlots + 1; frameTuple.set(nextLeft, aTuple); if (joinQual.eval(frameTuple).asBool()) { - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } else { // padding null - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols); frameTuple.set(nullPaddedTuple, aTuple); - projector.eval(frameTuple, outTuple); - return outTuple; + return projector.eval(frameTuple); } } } @@ -340,6 +321,9 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { innerTupleSlots.clear(); posRightTupleSlots = -1; posLeftTupleSlots = -1; + leftTuple = rightTuple = null; + prevLeftTuple = prevRightTuple = null; + nextLeft = null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index b4f7a38..b49fa40 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -247,9 +247,8 @@ public class SeqScanExec extends ScanExec { public Tuple next() throws IOException { while(scanIt.hasNext()) { - Tuple outTuple = new VTuple(outColumnNum); Tuple t = scanIt.next(); - projector.eval(t, outTuple); + Tuple outTuple = projector.eval(t); outTuple.setOffset(t.getOffset()); return outTuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java index 2feecd1..71602b8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.catalog.Column; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.GroupbyNode; import org.apache.tajo.storage.Tuple; @@ -41,24 +42,38 @@ import java.io.IOException; * it makes an output tuple. */ public class SortAggregateExec extends AggregationExec { + private final int groupingKeyIds[]; private Tuple lastKey = null; + private final Tuple currentKey; + private final Tuple outTuple; private boolean finished = false; private FunctionContext contexts[]; public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException { super(context, plan, child); contexts = new FunctionContext[plan.getAggFunctions() == null ? 0 : plan.getAggFunctions().length]; + + final Column [] keyColumns = plan.getGroupingColumns(); + groupingKeyIds = new int[groupingKeyNum]; + Column col; + for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) { + col = keyColumns[idx]; + if (col.hasQualifier()) { + groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName()); + } else { + groupingKeyIds[idx] = inSchema.getColumnIdByName(col.getSimpleName()); + } + } + currentKey = new VTuple(groupingKeyNum); + outTuple = new VTuple(outSchema.size()); } @Override public Tuple next() throws IOException { - Tuple currentKey; Tuple tuple = null; - Tuple outputTuple = null; while(!context.isStopped() && (tuple = child.next()) != null) { // get a key tuple - currentKey = new VTuple(groupingKeyIds.length); for(int i = 0; i < groupingKeyIds.length; i++) { currentKey.put(i, tuple.asDatum(groupingKeyIds[i])); } @@ -75,7 +90,7 @@ public class SortAggregateExec extends AggregationExec { aggFunctions[i].merge(contexts[i], tuple); } } - lastKey = currentKey; + lastKey = new VTuple(currentKey.getValues()); } else { // aggregate for (int i = 0; i < aggFunctionsNum; i++) { @@ -85,14 +100,13 @@ public class SortAggregateExec extends AggregationExec { } else { /** Finalization State */ // finalize aggregate and return - outputTuple = new VTuple(outSchema.size()); int tupleIdx = 0; for(; tupleIdx < groupingKeyNum; tupleIdx++) { - outputTuple.put(tupleIdx, lastKey.asDatum(tupleIdx)); + outTuple.put(tupleIdx, lastKey.asDatum(tupleIdx)); } for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) { - outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx])); + outTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx])); } for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) { @@ -100,8 +114,8 @@ public class SortAggregateExec extends AggregationExec { aggFunctions[evalIdx].merge(contexts[evalIdx], tuple); } - lastKey = currentKey; - return outputTuple; + lastKey.put(currentKey.getValues()); + return outTuple; } } // while loop @@ -110,17 +124,17 @@ public class SortAggregateExec extends AggregationExec { return null; } if (!finished) { - outputTuple = new VTuple(outSchema.size()); int tupleIdx = 0; for(; tupleIdx < groupingKeyNum; tupleIdx++) { - outputTuple.put(tupleIdx, lastKey.asDatum(tupleIdx)); + outTuple.put(tupleIdx, lastKey.asDatum(tupleIdx)); } for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) { - outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx])); + outTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx])); } finished = true; + return outTuple; } - return outputTuple; + return null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index a40fc1d..3a0dd38 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -22,7 +22,6 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.statistics.StatisticsUtil; -import org.apache.tajo.datum.Datum; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; @@ -85,7 +84,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); appender = getNextPartitionAppender(getSubdirectory(currentKey)); - prevKey = new VTuple(currentKey); + prevKey.put(currentKey.getValues()); // reset all states for file rotating writtenFileNum = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java index 28be9de..b652b0a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java @@ -26,7 +26,6 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.Comparator; -import java.util.List; public abstract class SortExec extends UnaryPhysicalExec { @@ -40,7 +39,7 @@ public abstract class SortExec extends UnaryPhysicalExec { this.comparator = new BaseTupleComparator(inSchema, sortSpecs); } - protected TupleSorter getSorter(List<Tuple> tupleSlots) { + protected TupleSorter getSorter(TupleList tupleSlots) { if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) { return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 6031fdb..c317f7f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -49,7 +49,6 @@ public class StoreTableExec extends UnaryPhysicalExec { private PersistentStoreNode plan; private TableMeta meta; private Appender appender; - private Tuple tuple; // for file punctuation private TableStats sumStats; // for aggregating all stats of written files @@ -125,6 +124,7 @@ public class StoreTableExec extends UnaryPhysicalExec { */ @Override public Tuple next() throws IOException { + Tuple tuple; while(!context.isStopped() && (tuple = child.next()) != null) { appender.addTuple(tuple); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java new file mode 100644 index 0000000..71ccae1 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; + +import java.util.ArrayList; + +/** + * In TupleList, input tuples are automatically cloned whenever the add() method is called. + * This data structure is usually used in physical operators like hash join or hash aggregation. + */ +public class TupleList extends ArrayList<Tuple> { + + public TupleList() { + super(); + } + + public TupleList(int initialCapacity) { + super(initialCapacity); + } + + @Override + public boolean add(Tuple tuple) { + return super.add(new VTuple(tuple)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java new file mode 100644 index 0000000..6f72522 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.annotation.Nullable; + +import java.util.HashMap; + +/** + * TupleMap is a map which uses KeyTuple with its key. + * Please note that every put() call creates a copy of input KeyTuple. + * This data structure is usually used in physical operators like hash join or hash aggregation. + * + * @param <E> value type + */ +public class TupleMap<E> extends HashMap<KeyTuple, E> { + + public TupleMap() { + super(); + } + + public TupleMap(int initialCapacity) { + super(initialCapacity); + } + + public TupleMap(TupleMap tupleMap){ + super(tupleMap); + } + + /** + * Add a pair of (key, value). + * The key is always copied. + * + * @param key + * @param value + * @return + */ + @Override + public E put(@Nullable KeyTuple key, E value) { + if (key != null) { + return super.put(new KeyTuple(key.getValues()), value); + } else { + return super.put(null, value); + } + } + + /** + * Add a pair of (key, value). + * The key is not copied. + * + * @param key + * @param value + * @return + */ + public E putWihtoutKeyCopy(@Nullable KeyTuple key, E value) { + return super.put(key, value); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java new file mode 100644 index 0000000..ff131c2 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import java.util.HashSet; + +/** + * TupleSet is a set which accepts only KeyTuple. + * Input tuples are automatically cloned whenever the add() method is called. + * This data structure is usually used in physical operators like hash join or hash aggregation. + */ +public class TupleSet extends HashSet<KeyTuple> { + + @Override + public boolean add(KeyTuple tuple) { + return super.add(new KeyTuple(tuple)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java index 57fe816..abf2808 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java @@ -22,18 +22,17 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import java.util.Collections; -import java.util.List; public interface TupleSorter { Iterable<Tuple> sort(); - public static class DefaultSorter implements TupleSorter { + class DefaultSorter implements TupleSorter { - private final List<Tuple> target; + private final TupleList target; private final TupleComparator comparator; - public DefaultSorter(List<Tuple> target, TupleComparator comparator) { + public DefaultSorter(TupleList target, TupleComparator comparator) { this.target = target; this.comparator = comparator; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java index 18d853f..d750f15 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java @@ -25,7 +25,6 @@ import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; import java.util.Iterator; -import java.util.List; /** * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting @@ -35,7 +34,7 @@ public class VectorizedSorter extends ComparableVector implements IndexedSortabl private final int[] mappings; // index indirection - public VectorizedSorter(List<Tuple> source, SortSpec[] sortKeys, int[] keyIndex) { + public VectorizedSorter(TupleList source, SortSpec[] sortKeys, int[] keyIndex) { super(source.size(), sortKeys, keyIndex); source.toArray(tuples); // wish it's array list mappings = new int[tuples.length];
