TAJO-1542 Refactoring of HashJoinExecs. (contributed by navis, committed by hyunsik)
Closes #529 #567 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/36a703c5 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/36a703c5 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/36a703c5 Branch: refs/heads/index_support Commit: 36a703c5dc2c2257dfd52232f204507fb4b79024 Parents: f3acbdf Author: Hyunsik Choi <[email protected]> Authored: Thu May 14 20:59:09 2015 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Thu May 14 20:59:09 2015 -0700 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/catalog/Schema.java | 16 + .../org/apache/tajo/storage/EmptyTuple.java | 140 +----- .../java/org/apache/tajo/storage/NullTuple.java | 175 +++++++ .../java/org/apache/tajo/storage/VTuple.java | 20 +- .../engine/planner/PhysicalPlannerImpl.java | 22 +- .../physical/BasicPhysicalExecutorVisitor.java | 8 - .../planner/physical/CommonHashJoinExec.java | 191 ++++++++ .../engine/planner/physical/CommonJoinExec.java | 172 ++++++- .../planner/physical/HashFullOuterJoinExec.java | 247 ++++------ .../engine/planner/physical/HashJoinExec.java | 212 +-------- .../planner/physical/HashLeftAntiJoinExec.java | 59 +-- .../planner/physical/HashLeftOuterJoinExec.java | 292 +----------- .../planner/physical/HashLeftSemiJoinExec.java | 48 +- .../planner/physical/NLLeftOuterJoinExec.java | 101 ---- .../physical/PhysicalExecutorVisitor.java | 3 - .../physical/RightOuterMergeJoinExec.java | 40 +- .../apache/tajo/engine/utils/CacheHolder.java | 3 +- .../planner/physical/TestHashSemiJoinExec.java | 8 +- .../physical/TestLeftOuterHashJoinExec.java | 104 ++-- .../physical/TestLeftOuterNLJoinExec.java | 474 ------------------- .../testJoinFilterOfRowPreservedTable1.sql | 2 +- .../testJoinFilterOfRowPreservedTable1.result | 2 +- .../plan/expr/AggregationFunctionCallEval.java | 4 +- .../apache/tajo/plan/expr/AlgebraicUtil.java | 5 + .../org/apache/tajo/plan/expr/EvalNode.java | 39 +- .../java/org/apache/tajo/plan/expr/InEval.java | 2 +- .../plan/expr/PatternMatchPredicateEval.java | 2 +- .../tajo/plan/expr/WindowFunctionEval.java | 2 +- .../org/apache/tajo/storage/FrameTuple.java | 14 +- 30 files changed, 842 insertions(+), 1568 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 77be589..44ae4b4 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1542: Refactoring of HashJoinExecs. (Contributed Navis, Committed by + hyunsik) + TAJO-1591: Change StoreType represented as Enum to String type. (hyunsik) TAJO-1452: Improve function listing order (Contributed Dongjoon Hyun, http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java index 0e4b741..80c4d83 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java @@ -400,6 +400,22 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject { return containFlag; } + /** + * Return TRUE if any column in <code>columns</code> is included in this schema. + * + * @param columns Columns to be checked + * @return true if any column in <code>columns</code> is included in this schema. + * Otherwise, false. + */ + public boolean containsAny(Collection<Column> columns) { + for (Column column : columns) { + if (contains(column)) { + return true; + } + } + return false; + } + public synchronized Schema addColumn(String name, TypeDesc typeDesc) { String normalized = name; if(fieldsByQualifiedName.containsKey(normalized)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java index 89e72ed..cdcebd7 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/EmptyTuple.java @@ -18,17 +18,12 @@ package org.apache.tajo.storage; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatum; - /* This class doesnât have content datum. if selected column is zero, this is useful * e.g. select count(*) from table * */ -public class EmptyTuple implements Tuple, Cloneable { +public class EmptyTuple extends NullTuple { private static EmptyTuple tuple; - private static Datum[] EMPTY_VALUES = new Datum[0]; static { tuple = new EmptyTuple(); @@ -39,138 +34,11 @@ public class EmptyTuple implements Tuple, Cloneable { } private EmptyTuple() { + super(0); } @Override - public int size() { - return 0; - } - - public boolean contains(int fieldId) { - return false; - } - - @Override - public boolean isNull(int fieldid) { - return true; - } - - @Override - public boolean isNotNull(int fieldid) { - return false; - } - - @Override - public void clear() { - } - - @Override - public void put(int fieldId, Datum value) { - throw new UnsupportedOperationException(); - } - - @Override - public void put(int fieldId, Datum[] values) { - throw new UnsupportedOperationException(); - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new UnsupportedOperationException(); - } - - @Override - public void put(Datum[] values) { - throw new UnsupportedOperationException(); - } - - @Override - public Datum get(int fieldId) { - return NullDatum.get(); - } - - @Override - public void setOffset(long offset) { - - } - - @Override - public long getOffset() { - return -1; - } - - @Override - public boolean getBool(int fieldId) { - return NullDatum.get().asBool(); - } - - @Override - public byte getByte(int fieldId) { - return NullDatum.get().asByte(); - } - - @Override - public char getChar(int fieldId) { - return NullDatum.get().asChar(); - } - - @Override - public byte[] getBytes(int fieldId) { - return NullDatum.get().asByteArray(); - } - - @Override - public short getInt2(int fieldId) { - return NullDatum.get().asInt2(); - } - - @Override - public int getInt4(int fieldId) { - return NullDatum.get().asInt4(); - } - - @Override - public long getInt8(int fieldId) { - return NullDatum.get().asInt8(); - } - - @Override - public float getFloat4(int fieldId) { - return NullDatum.get().asFloat4(); - } - - @Override - public double getFloat8(int fieldId) { - return NullDatum.get().asFloat8(); - } - - @Override - public String getText(int fieldId) { - return NullDatum.get().asChars(); - } - - @Override - public ProtobufDatum getProtobufDatum(int fieldId) { - throw new UnsupportedOperationException(); - } - - @Override - public Datum getInterval(int fieldId) { - return NullDatum.get(); - } - - @Override - public char[] getUnicodeChars(int fieldId) { - return NullDatum.get().asUnicodeChars(); - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - throw new CloneNotSupportedException(); - } - - @Override - public Datum[] getValues() { - return EMPTY_VALUES; + public Tuple clone() { + return this; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java new file mode 100644 index 0000000..45eb859 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatum; + +import java.util.Arrays; + +/** + * A tuple which contains all null datums. It is used for outer joins. + */ +public class NullTuple implements Tuple, Cloneable { + + public static Tuple create(int size) { + return new NullTuple(size); + } + + private final int size; + + NullTuple(int size) { + this.size = size; + } + + @Override + public int size() { + return size; + } + + public boolean contains(int fieldId) { + return fieldId < size; + } + + @Override + public boolean isNull(int fieldid) { + return true; + } + + @Override + public boolean isNotNull(int fieldid) { + return false; + } + + @Override + public void clear() { + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedOperationException(); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedOperationException(); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedOperationException(); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedOperationException(); + } + + @Override + public Datum get(int fieldId) { + return NullDatum.get(); + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return NullDatum.get().asBool(); + } + + @Override + public byte getByte(int fieldId) { + return NullDatum.get().asByte(); + } + + @Override + public char getChar(int fieldId) { + return NullDatum.get().asChar(); + } + + @Override + public byte[] getBytes(int fieldId) { + return NullDatum.get().asByteArray(); + } + + @Override + public short getInt2(int fieldId) { + return NullDatum.get().asInt2(); + } + + @Override + public int getInt4(int fieldId) { + return NullDatum.get().asInt4(); + } + + @Override + public long getInt8(int fieldId) { + return NullDatum.get().asInt8(); + } + + @Override + public float getFloat4(int fieldId) { + return NullDatum.get().asFloat4(); + } + + @Override + public double getFloat8(int fieldId) { + return NullDatum.get().asFloat8(); + } + + @Override + public String getText(int fieldId) { + return NullDatum.get().asChars(); + } + + @Override + public ProtobufDatum getProtobufDatum(int fieldId) { + throw new UnsupportedOperationException(); + } + + @Override + public Datum getInterval(int fieldId) { + return NullDatum.get(); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + return NullDatum.get().asUnicodeChars(); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return new NullTuple(size); + } + + @Override + public Datum[] getValues() { + Datum[] datum = new Datum[size]; + Arrays.fill(datum, NullDatum.get()); + return datum; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java index 5e839b7..da69eb0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java @@ -201,6 +201,7 @@ public class VTuple implements Tuple, Cloneable { return tuple; } + @Override public String toString() { return toDisplayString(getValues()); } @@ -225,22 +226,15 @@ public class VTuple implements Tuple, Cloneable { } public static String toDisplayString(Datum [] values) { - boolean first = true; StringBuilder str = new StringBuilder(); - str.append("("); - for(int i=0; i < values.length; i++) { - if(values[i] != null) { - if(first) { - first = false; - } else { - str.append(", "); - } - str.append(i) - .append("=>") - .append(values[i]); + str.append('('); + for (Datum datum : values) { + if (str.length() > 1) { + str.append(','); } + str.append(datum); } - str.append(")"); + str.append(')'); return str.toString(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 506b03e..978dde8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -466,14 +466,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { case IN_MEMORY_HASH_JOIN: LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join]."); return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); - case NESTED_LOOP_JOIN: - //the right operand is too large, so we opt for NL implementation of left outer join - LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join]."); - return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec); + case MERGE_JOIN: + //the right operand is too large, so we opt for merge join implementation + LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join]."); + return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec); default: LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name()); - LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name()); - return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); + LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN); + return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec); } } else { return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec); @@ -500,9 +500,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); } else { - //the right operand is too large, so we opt for NL implementation of left outer join - LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join]."); - return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec); + //the right operand is too large, so we opt for merge join implementation + LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join]."); + return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec); } } @@ -566,7 +566,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec); default: LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name()); - LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name()); + LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN); return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec); } } else { @@ -589,7 +589,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { default: LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name()); - LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name()); + LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN); return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec); } } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java index 42611b0..c2d93bb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java @@ -65,8 +65,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx return visitMergeJoin(context, (MergeJoinExec) exec, stack); } else if (exec instanceof NLJoinExec) { return visitNLJoin(context, (NLJoinExec) exec, stack); - } else if (exec instanceof NLLeftOuterJoinExec) { - return visitNLLeftOuterJoin(context, (NLLeftOuterJoinExec) exec, stack); } else if (exec instanceof ProjectionExec) { return visitProjection(context, (ProjectionExec) exec, stack); } else if (exec instanceof RangeShuffleFileWriteExec) { @@ -214,12 +212,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx } @Override - public RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack) - throws PhysicalPlanningException { - return visitBinaryExecutor(context, exec, stack); - } - - @Override public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException { return visitUnaryExecutor(context, exec, stack); http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java new file mode 100644 index 0000000..ff9b253 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.engine.utils.CacheHolder; +import org.apache.tajo.engine.utils.TableCacheKey; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.ExecutionBlockSharedResource; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * common exec for all hash join execs + * + * @param <T> Tuple collection type to load small relation onto in-memory + */ +public abstract class CommonHashJoinExec<T> extends CommonJoinExec { + + protected final List<Column[]> joinKeyPairs; + + // temporal tuples and states for nested loop join + protected boolean first = true; + protected Map<Tuple, T> tupleSlots; + + protected Iterator<Tuple> iterator; + + protected final Tuple keyTuple; + + protected final int rightNumCols; + protected final int leftNumCols; + + protected final int[] leftKeyList; + protected final int[] rightKeyList; + + protected boolean finished; + + public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { + super(context, plan, outer, inner); + + // HashJoin only can manage equi join key pairs. + this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), + inner.getSchema(), false); + + leftKeyList = new int[joinKeyPairs.size()]; + rightKeyList = new int[joinKeyPairs.size()]; + + for (int i = 0; i < joinKeyPairs.size(); i++) { + leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName()); + } + + for (int i = 0; i < joinKeyPairs.size(); i++) { + rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); + } + + leftNumCols = outer.getSchema().size(); + rightNumCols = inner.getSchema().size(); + + keyTuple = new VTuple(leftKeyList.length); + } + + protected void loadRightToHashTable() throws IOException { + ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class); + if (scanExec.canBroadcast()) { + /* If this table can broadcast, all tasks in a node will share the same cache */ + TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey( + context, scanExec.getCanonicalName(), scanExec.getFragments()); + loadRightFromCache(key); + } else { + this.tupleSlots = convert(buildRightToHashTable(), false); + } + + first = false; + } + + protected void loadRightFromCache(TableCacheKey key) throws IOException { + ExecutionBlockSharedResource sharedResource = context.getSharedResource(); + + CacheHolder<Map<Tuple, List<Tuple>>> holder; + synchronized (sharedResource.getLock()) { + if (sharedResource.hasBroadcastCache(key)) { + holder = sharedResource.getBroadcastCache(key); + } else { + Map<Tuple, List<Tuple>> built = buildRightToHashTable(); + holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), null); + sharedResource.addBroadcastCache(key, holder); + } + } + this.tupleSlots = convert(holder.getData(), true); + } + + protected Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException { + Tuple tuple; + Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000); + + while (!context.isStopped() && (tuple = rightChild.next()) != null) { + Tuple keyTuple = new VTuple(joinKeyPairs.size()); + for (int i = 0; i < rightKeyList.length; i++) { + keyTuple.put(i, tuple.get(rightKeyList[i])); + } + + List<Tuple> newValue = map.get(keyTuple); + if (newValue == null) { + map.put(keyTuple, newValue = new ArrayList<Tuple>()); + } + // if source is scan or groupby, it needs not to be cloned + newValue.add(new VTuple(tuple)); + } + return map; + } + + // todo: convert loaded data to cache condition + protected abstract Map<Tuple, T> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache) + throws IOException; + + protected Tuple toKey(final Tuple outerTuple) { + for (int i = 0; i < leftKeyList.length; i++) { + keyTuple.put(i, outerTuple.get(leftKeyList[i])); + } + return keyTuple; + } + + @Override + public void rescan() throws IOException { + super.rescan(); + finished = false; + iterator = null; + } + + @Override + public void close() throws IOException { + super.close(); + iterator = null; + if (tupleSlots != null) { + tupleSlots.clear(); + tupleSlots = null; + } + } + + @Override + public TableStats getInputStats() { + if (leftChild == null) { + return inputStats; + } + TableStats leftInputStats = leftChild.getInputStats(); + inputStats.setNumBytes(0); + inputStats.setReadBytes(0); + inputStats.setNumRows(0); + + if (leftInputStats != null) { + inputStats.setNumBytes(leftInputStats.getNumBytes()); + inputStats.setReadBytes(leftInputStats.getReadBytes()); + inputStats.setNumRows(leftInputStats.getNumRows()); + } + + TableStats rightInputStats = rightChild.getInputStats(); + if (rightInputStats != null) { + inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes()); + inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes()); + inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows()); + } + + return inputStats; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java index 2535edf..ec29085 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java @@ -18,36 +18,178 @@ package org.apache.tajo.engine.planner.physical; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.plan.expr.AlgebraicUtil; +import org.apache.tajo.plan.expr.BinaryEval; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.storage.FrameTuple; +import org.apache.tajo.storage.NullTuple; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; -// common join exec except HashLeftOuterJoinExec +/** + * common exec for all join execs + */ public abstract class CommonJoinExec extends BinaryPhysicalExec { // from logical plan protected JoinNode plan; protected final boolean hasJoinQual; - protected EvalNode joinQual; + protected EvalNode joinQual; // ex) a.id = b.id + protected EvalNode leftJoinFilter; // ex) a > 10 + protected EvalNode rightJoinFilter; // ex) b > 5 + + protected final Schema leftSchema; + protected final Schema rightSchema; + + protected final FrameTuple frameTuple; + protected final Tuple outTuple; // projection protected Projector projector; - public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, - PhysicalExec inner) { + public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), plan.getOutSchema(), outer, inner); this.plan = plan; - this.joinQual = plan.getJoinQual(); - this.hasJoinQual = plan.hasJoinQual(); + this.leftSchema = outer.getSchema(); + this.rightSchema = inner.getSchema(); + if (plan.hasJoinQual()) { + EvalNode[] extracted = extractJoinConditions(plan.getJoinQual(), leftSchema, rightSchema); + joinQual = extracted[0]; + leftJoinFilter = extracted[1]; + rightJoinFilter = extracted[2]; + } + this.hasJoinQual = joinQual != null; // for projection this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); + + // for join + this.frameTuple = new FrameTuple(); + this.outTuple = new VTuple(outSchema.size()); + } + + /** + * It separates a singular CNF-formed join condition into a join condition, a left join filter, and + * right join filter. + * + * @param joinQual the original join condition + * @param leftSchema Left table schema + * @param rightSchema Left table schema + * @return Three element EvalNodes, 0 - join condition, 1 - left join filter, 2 - right join filter. + */ + private EvalNode[] extractJoinConditions(EvalNode joinQual, Schema leftSchema, Schema rightSchema) { + List<EvalNode> joinQuals = Lists.newArrayList(); + List<EvalNode> leftFilters = Lists.newArrayList(); + List<EvalNode> rightFilters = Lists.newArrayList(); + for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(joinQual)) { + if (!(eachQual instanceof BinaryEval)) { + continue; // todo 'between', etc. + } + BinaryEval binaryEval = (BinaryEval)eachQual; + LinkedHashSet<Column> leftColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getLeftExpr()); + LinkedHashSet<Column> rightColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getRightExpr()); + boolean leftInLeft = leftSchema.containsAny(leftColumns); + boolean rightInLeft = leftSchema.containsAny(rightColumns); + boolean leftInRight = rightSchema.containsAny(leftColumns); + boolean rightInRight = rightSchema.containsAny(rightColumns); + + boolean columnsFromLeft = leftInLeft || rightInLeft; + boolean columnsFromRight = leftInRight || rightInRight; + if (!columnsFromLeft && !columnsFromRight) { + continue; // todo constant expression : this should be done in logical phase + } + if (columnsFromLeft ^ columnsFromRight) { + if (columnsFromLeft) { + leftFilters.add(eachQual); + } else { + rightFilters.add(eachQual); + } + continue; + } + if ((leftInLeft && rightInLeft) || (leftInRight && rightInRight)) { + continue; // todo not allowed yet : this should be checked in logical phase + } + joinQuals.add(eachQual); + } + return new EvalNode[] { + joinQuals.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinQuals), + leftFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(leftFilters), + rightFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(rightFilters) + }; + } + + public JoinNode getPlan() { + return plan; + } + + /** + * Evaluate an input tuple with a left join filter + * + * @param left Tuple to be evaluated + * @return True if an input tuple is matched to the left join filter + */ + protected boolean leftFiltered(Tuple left) { + return leftJoinFilter != null && !leftJoinFilter.eval(left).asBool(); + } + + /** + * Evaluate an input tuple with a right join filter + * + * @param right Tuple to be evaluated + * @return True if an input tuple is matched to the right join filter + */ + protected boolean rightFiltered(Tuple right) { + return rightJoinFilter != null && !rightJoinFilter.eval(right).asBool(); + } + + /** + * Return an tuple iterator filters rows in a right table by using a join filter. + * It must takes rows of a right table. + * + * @param rightTuples Tuple iterator + * @return rows Filtered by a join filter on right table. + */ + protected Iterator<Tuple> rightFiltered(Iterable<Tuple> rightTuples) { + if (rightTuples == null) { + return Iterators.emptyIterator(); + } + if (rightJoinFilter == null) { + return rightTuples.iterator(); + } + return Iterators.filter(rightTuples.iterator(), new Predicate<Tuple>() { + @Override + public boolean apply(Tuple input) { + return rightJoinFilter.eval(input).asBool(); + } + }); + } + + /** + * Return an tuple iterator, containing a single NullTuple + * + * @param width the width of tuple + * @return an tuple iterator, containing a single NullTuple + */ + protected Iterator<Tuple> nullIterator(int width) { + return Arrays.asList(NullTuple.create(width)).iterator(); } @Override @@ -56,6 +198,12 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { if (hasJoinQual) { joinQual.bind(context.getEvalContext(), inSchema); } + if (leftJoinFilter != null) { + leftJoinFilter.bind(context.getEvalContext(), leftSchema); + } + if (rightJoinFilter != null) { + rightJoinFilter.bind(context.getEvalContext(), rightSchema); + } } @Override @@ -63,10 +211,7 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { if (hasJoinQual) { joinQual = context.getPrecompiledEval(inSchema, joinQual); } - } - - public JoinNode getPlan() { - return plan; + // compile filters? } @Override @@ -74,6 +219,13 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec { super.close(); plan = null; joinQual = null; + leftJoinFilter = null; + rightJoinFilter = null; projector = null; } + + @Override + public String toString() { + return getClass().getSimpleName() + " [" + leftSchema + " : " + rightSchema + "]"; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index 6e28ae0..1645263 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -18,101 +18,59 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; +import org.apache.tajo.storage.NullTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.Pair; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.*; +public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List<Tuple>>> { -public class HashFullOuterJoinExec extends CommonJoinExec { - - protected List<Column[]> joinKeyPairs; - - // temporal tuples and states for nested loop join - protected boolean first = true; - protected FrameTuple frameTuple; - protected Tuple outTuple = null; - protected Map<Tuple, List<Tuple>> tupleSlots; - protected Iterator<Tuple> iterator = null; - protected Tuple leftTuple; - protected Tuple leftKeyTuple; - - protected int [] leftKeyList; - protected int [] rightKeyList; - - protected boolean finished = false; - protected boolean shouldGetLeftTuple = true; - - private int rightNumCols; - private int leftNumCols; - private Map<Tuple, Boolean> matched; + private boolean finalLoop; // final loop for right unmatched public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) { super(context, plan, outer, inner); - this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000); - - // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key, - // we have a boolean flag, initially false (whether this join key had at least one match on the left operand) - this.matched = new HashMap<Tuple, Boolean>(10000); - - // HashJoin only can manage equi join key pairs. - this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), inner.getSchema(), - false); - - leftKeyList = new int[joinKeyPairs.size()]; - rightKeyList = new int[joinKeyPairs.size()]; - - for (int i = 0; i < joinKeyPairs.size(); i++) { - leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName()); - } - - for (int i = 0; i < joinKeyPairs.size(); i++) { - rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); - } - - // for join - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); - leftKeyTuple = new VTuple(leftKeyList.length); - - leftNumCols = outer.getSchema().size(); - rightNumCols = inner.getSchema().size(); } - protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { - for (int i = 0; i < leftKeyList.length; i++) { - keyTuple.put(i, outerTuple.get(leftKeyList[i])); - } - } + public Iterator<Tuple> getUnmatchedRight() { - public Tuple getNextUnmatchedRight() { + return new Iterator<Tuple>() { - List<Tuple> newValue; - Tuple returnedTuple; - // get a keyTUple from the matched hashmap with a boolean false value - for(Tuple aKeyTuple : matched.keySet()) { - if(matched.get(aKeyTuple) == false) { - newValue = tupleSlots.get(aKeyTuple); - returnedTuple = newValue.remove(0); - tupleSlots.put(aKeyTuple, newValue); + private Iterator<Pair<Boolean, List<Tuple>>> iterator1 = tupleSlots.values().iterator(); + private Iterator<Tuple> iterator2; - // after taking the last element from the list in tupleSlots, set flag true in matched as well - if(newValue.isEmpty()){ - matched.put(aKeyTuple, true); + @Override + public boolean hasNext() { + if (hasMore()) { + return true; } + for (iterator2 = null; !hasMore() && iterator1.hasNext();) { + Pair<Boolean, List<Tuple>> next = iterator1.next(); + if (!next.getFirst()) { + iterator2 = next.getSecond().iterator(); + } + } + return hasMore(); + } - return returnedTuple; + private boolean hasMore() { + return iterator2 != null && iterator2.hasNext(); } - } - return null; + + @Override + public Tuple next() { + return iterator2.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + }; } public Tuple next() throws IOException { @@ -120,112 +78,67 @@ public class HashFullOuterJoinExec extends CommonJoinExec { loadRightToHashTable(); } - Tuple rightTuple; - boolean found = false; - - while(!context.isStopped() && !finished) { - if (shouldGetLeftTuple) { // initially, it is true. - // getting new outer - leftTuple = leftChild.next(); // it comes from a disk - if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed. - // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side - Tuple unmatchedRightTuple = getNextUnmatchedRight(); - if( unmatchedRightTuple == null) { - finished = true; - outTuple = null; - return null; - } else { - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols); - frameTuple.set(nullPaddedTuple, unmatchedRightTuple); - projector.eval(frameTuple, outTuple); - - return outTuple; - } - } - - // getting corresponding right - getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple - List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple); - if (rightTuples != null) { // found right tuples on in-memory hash table. - iterator = rightTuples.iterator(); - shouldGetLeftTuple = false; - } else { - //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway - //output a tuple with the nulls padded rightTuple - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols); - frameTuple.set(leftTuple, nullPaddedTuple); - projector.eval(frameTuple, outTuple); - // we simulate we found a match, which is exactly the null padded one - shouldGetLeftTuple = true; - return outTuple; - } - } - - // getting a next right tuple on in-memory hash table. - rightTuple = iterator.next(); - frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples - - if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable + while (!context.isStopped() && !finished) { + if (iterator != null && iterator.hasNext()) { + frameTuple.setRight(iterator.next()); projector.eval(frameTuple, outTuple); - found = true; - getKeyLeftTuple(leftTuple, leftKeyTuple); - matched.put(leftKeyTuple, true); + return outTuple; } - - if (!iterator.hasNext()) { // no more right tuples for this hash key - shouldGetLeftTuple = true; + if (finalLoop) { + finished = true; + return null; } - - if (found) { - break; + Tuple leftTuple = leftChild.next(); + if (leftTuple == null) { + // if no more tuples in left tuples, a join is completed. + // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side + frameTuple.setLeft(NullTuple.create(leftNumCols)); + iterator = getUnmatchedRight(); + finalLoop = true; + continue; } - } - return outTuple; - } - - protected void loadRightToHashTable() throws IOException { - Tuple tuple; - Tuple keyTuple; + frameTuple.setLeft(leftTuple); - while (!context.isStopped() && (tuple = rightChild.next()) != null) { - keyTuple = new VTuple(joinKeyPairs.size()); - for (int i = 0; i < rightKeyList.length; i++) { - keyTuple.put(i, tuple.get(rightKeyList[i])); + if (leftFiltered(leftTuple)) { + iterator = nullIterator(rightNumCols); + continue; } - - List<Tuple> newValue = tupleSlots.get(keyTuple); - if (newValue != null) { - newValue.add(tuple); - } else { - newValue = new ArrayList<Tuple>(); - newValue.add(tuple); - tupleSlots.put(keyTuple, newValue); - matched.put(keyTuple,false); + // getting corresponding right + Pair<Boolean, List<Tuple>> hashed = tupleSlots.get(toKey(leftTuple)); + if (hashed == null) { + iterator = nullIterator(rightNumCols); + continue; + } + Iterator<Tuple> rightTuples = rightFiltered(hashed.getSecond()); + if (!rightTuples.hasNext()) { + iterator = nullIterator(rightNumCols); + continue; } + iterator = rightTuples; + hashed.setFirst(true); // match found } - first = false; + + return null; } @Override - public void rescan() throws IOException { - super.rescan(); - - tupleSlots.clear(); - first = true; - - finished = false; - iterator = null; - shouldGetLeftTuple = true; + protected Map<Tuple, Pair<Boolean, List<Tuple>>> convert(Map<Tuple, List<Tuple>> hashed, + boolean fromCache) throws IOException { + Map<Tuple, Pair<Boolean, List<Tuple>>> tuples = new HashMap<Tuple, Pair<Boolean, List<Tuple>>>(hashed.size()); + for (Map.Entry<Tuple, List<Tuple>> entry : hashed.entrySet()) { + // flag: initially false (whether this join key had at least one match on the counter part) + tuples.put(entry.getKey(), new Pair<Boolean, List<Tuple>>(false, entry.getValue())); + } + return tuples; } @Override - public void close() throws IOException { - super.close(); - tupleSlots.clear(); - matched.clear(); - tupleSlots = null; - matched = null; - iterator = null; + public void rescan() throws IOException { + super.rescan(); + for (Pair<Boolean, List<Tuple>> value : tupleSlots.values()) { + value.setFirst(false); + } + finalLoop = false; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index 48f3682..a4215fa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -18,225 +18,59 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.engine.utils.CacheHolder; -import org.apache.tajo.engine.utils.TableCacheKey; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.worker.ExecutionBlockSharedResource; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.*; -public class HashJoinExec extends CommonJoinExec { - - protected List<Column[]> joinKeyPairs; - - // temporal tuples and states for nested loop join - protected boolean first = true; - protected FrameTuple frameTuple; - protected Tuple outTuple = null; - protected Map<Tuple, List<Tuple>> tupleSlots; - protected Iterator<Tuple> iterator = null; - protected Tuple leftTuple; - protected Tuple leftKeyTuple; - - protected int [] leftKeyList; - protected int [] rightKeyList; - - protected boolean finished = false; - protected boolean shouldGetLeftTuple = true; - - private TableStats cachedRightTableStats; +public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> { public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, PhysicalExec rightExec) { super(context, plan, leftExec, rightExec); - - // HashJoin only can manage equi join key pairs. - this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(), - rightExec.getSchema(), false); - - leftKeyList = new int[joinKeyPairs.size()]; - rightKeyList = new int[joinKeyPairs.size()]; - - for (int i = 0; i < joinKeyPairs.size(); i++) { - leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName()); - } - - for (int i = 0; i < joinKeyPairs.size(); i++) { - rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); - } - - // for join - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); - leftKeyTuple = new VTuple(leftKeyList.length); } - protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { - for (int i = 0; i < leftKeyList.length; i++) { - keyTuple.put(i, outerTuple.get(leftKeyList[i])); - } + @Override + protected Map<Tuple, List<Tuple>> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache) + throws IOException { + return fromCache ? new HashMap<Tuple, List<Tuple>>(hashed) : hashed; } + @Override public Tuple next() throws IOException { if (first) { loadRightToHashTable(); } - Tuple rightTuple; - boolean found = false; - - while(!context.isStopped() && !finished) { - if (shouldGetLeftTuple) { // initially, it is true. - // getting new outer - leftTuple = leftChild.next(); // it comes from a disk - if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed. - finished = true; - return null; - } - - // getting corresponding right - getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple - List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple); - if (rightTuples != null) { // found right tuples on in-memory hash table. - iterator = rightTuples.iterator(); - shouldGetLeftTuple = false; - } else { - shouldGetLeftTuple = true; - continue; - } - } - - // getting a next right tuple on in-memory hash table. - rightTuple = iterator.next(); - frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples - if (joinQual.eval(frameTuple).isTrue()) { // if both tuples are joinable + while (!context.isStopped() && !finished) { + if (iterator != null && iterator.hasNext()) { + frameTuple.setRight(iterator.next()); projector.eval(frameTuple, outTuple); - found = true; - } - - if (!iterator.hasNext()) { // no more right tuples for this hash key - shouldGetLeftTuple = true; - } - - if (found) { - break; - } - } - - return new VTuple(outTuple); - } - - protected void loadRightToHashTable() throws IOException { - ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class); - if (scanExec.canBroadcast()) { - /* If this table can broadcast, all tasks in a node will share the same cache */ - TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey( - context, scanExec.getCanonicalName(), scanExec.getFragments()); - loadRightFromCache(key); - } else { - this.tupleSlots = buildRightToHashTable(); - } - - first = false; - } - - protected void loadRightFromCache(TableCacheKey key) throws IOException { - ExecutionBlockSharedResource sharedResource = context.getSharedResource(); - synchronized (sharedResource.getLock()) { - if (sharedResource.hasBroadcastCache(key)) { - CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); - this.tupleSlots = data.getData(); - this.cachedRightTableStats = data.getTableStats(); - } else { - CacheHolder.BroadcastCacheHolder holder = - new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null); - sharedResource.addBroadcastCache(key, holder); - CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); - this.tupleSlots = data.getData(); - this.cachedRightTableStats = data.getTableStats(); + return outTuple; } - } - } - - private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException { - Tuple tuple; - Tuple keyTuple; - Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000); - - while (!context.isStopped() && (tuple = rightChild.next()) != null) { - keyTuple = new VTuple(joinKeyPairs.size()); - for (int i = 0; i < rightKeyList.length; i++) { - keyTuple.put(i, tuple.get(rightKeyList[i])); + Tuple leftTuple = leftChild.next(); // it comes from a disk + if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed. + finished = leftTuple == null; + continue; } - List<Tuple> newValue = map.get(keyTuple); + frameTuple.setLeft(leftTuple); - if (newValue != null) { - newValue.add(tuple); - } else { - newValue = new ArrayList<Tuple>(); - newValue.add(tuple); - map.put(keyTuple, newValue); + // getting corresponding right + Iterable<Tuple> hashed = getRights(toKey(leftTuple)); + Iterator<Tuple> rightTuples = rightFiltered(hashed); + if (rightTuples.hasNext()) { + iterator = rightTuples; } } - return map; - } - - @Override - public void rescan() throws IOException { - super.rescan(); - - tupleSlots.clear(); - first = true; - - finished = false; - iterator = null; - shouldGetLeftTuple = true; + return null; } - @Override - public void close() throws IOException { - super.close(); - if (tupleSlots != null) { - tupleSlots.clear(); - tupleSlots = null; - } - - iterator = null; + private Iterable<Tuple> getRights(Tuple key) { + return tupleSlots.get(key); } - @Override - public TableStats getInputStats() { - if (leftChild == null) { - return inputStats; - } - TableStats leftInputStats = leftChild.getInputStats(); - inputStats.setNumBytes(0); - inputStats.setReadBytes(0); - inputStats.setNumRows(0); - - if (leftInputStats != null) { - inputStats.setNumBytes(leftInputStats.getNumBytes()); - inputStats.setReadBytes(leftInputStats.getReadBytes()); - inputStats.setNumRows(leftInputStats.getNumRows()); - } - - TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats; - if (rightInputStats != null) { - inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes()); - inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes()); - inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows()); - } - - return inputStats; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java index 881bf84..8239270 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java @@ -19,10 +19,8 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.worker.TaskAttemptContext; -import org.apache.tajo.datum.NullDatum; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import java.io.IOException; import java.util.List; @@ -33,16 +31,10 @@ import java.util.List; * If not found, it returns the tuple of the FROM side table with null padding. */ public class HashLeftAntiJoinExec extends HashJoinExec { - private Tuple rightNullTuple; public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild, PhysicalExec notInSideChild) { super(context, plan, fromSideChild, notInSideChild); - // NUll Tuple - rightNullTuple = new VTuple(leftChild.outColumnNum); - for (int i = 0; i < leftChild.outColumnNum; i++) { - rightNullTuple.put(i, NullDatum.get()); - } } /** @@ -56,54 +48,33 @@ public class HashLeftAntiJoinExec extends HashJoinExec { * @return The tuple which is unmatched to a given join condition. * @throws IOException */ + @Override public Tuple next() throws IOException { if (first) { loadRightToHashTable(); } - Tuple rightTuple; - boolean notFound; - while(!context.isStopped() && !finished) { - - // getting new outer - leftTuple = leftChild.next(); // it comes from a disk - if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed. - finished = true; - return null; - } - - // Try to find a hash bucket in in-memory hash table - getKeyLeftTuple(leftTuple, leftKeyTuple); - List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple); - if (rightTuples != null) { - // if found, it gets a hash bucket from the hash table. - iterator = rightTuples.iterator(); - } else { - // if not found, it returns a tuple. - frameTuple.set(leftTuple, rightNullTuple); + if (iterator != null && iterator.hasNext()) { + frameTuple.setRight(iterator.next()); projector.eval(frameTuple, outTuple); return outTuple; } - - // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket. - // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket. - notFound = true; - while (!context.isStopped() && notFound && iterator.hasNext()) { - rightTuple = iterator.next(); - frameTuple.set(leftTuple, rightTuple); - if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found - notFound = false; - } + // getting new outer + Tuple leftTuple = leftChild.next(); // it comes from a disk + if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed. + finished = leftTuple == null; + continue; } - if (notFound) { // if there is no matched tuple - frameTuple.set(leftTuple, rightNullTuple); - projector.eval(frameTuple, outTuple); - break; + frameTuple.setLeft(leftTuple); + + // Try to find a hash bucket in in-memory hash table + List<Tuple> hashed = tupleSlots.get(toKey(leftTuple)); + if (hashed == null || !rightFiltered(hashed).hasNext()) { + iterator = nullIterator(0); } } - - return outTuple; + return null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index 6f573d0..8613eac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -18,307 +18,61 @@ package org.apache.tajo.engine.planner.physical; -import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.engine.planner.Projector; -import org.apache.tajo.engine.utils.CacheHolder; -import org.apache.tajo.engine.utils.TableCacheKey; -import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.plan.expr.AlgebraicUtil; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.worker.ExecutionBlockSharedResource; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.*; +public class HashLeftOuterJoinExec extends HashJoinExec { -public class HashLeftOuterJoinExec extends BinaryPhysicalExec { - // from logical plan - protected JoinNode plan; - protected EvalNode joinQual; // ex) a.id = b.id - protected EvalNode joinFilter; // ex) a > 10 - - protected List<Column[]> joinKeyPairs; - - // temporal tuples and states for nested loop join - protected boolean first = true; - protected FrameTuple frameTuple; - protected Tuple outTuple = null; - protected Map<Tuple, List<Tuple>> tupleSlots; - protected Iterator<Tuple> iterator = null; - protected Tuple leftTuple; - protected Tuple leftKeyTuple; - - protected int [] leftKeyList; - protected int [] rightKeyList; - - protected boolean finished = false; - protected boolean shouldGetLeftTuple = true; - - // projection - protected Projector projector; - - private int rightNumCols; - private TableStats cachedRightTableStats; private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class); public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, PhysicalExec rightChild) { - super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()), - plan.getOutSchema(), leftChild, rightChild); - this.plan = plan; - - List<EvalNode> joinQuals = Lists.newArrayList(); - List<EvalNode> joinFilters = Lists.newArrayList(); - for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(plan.getJoinQual())) { - if (EvalTreeUtil.isJoinQual(eachQual, true)) { - joinQuals.add(eachQual); - } else { - joinFilters.add(eachQual); - } - } - - this.joinQual = AlgebraicUtil.createSingletonExprFromCNF(joinQuals.toArray(new EvalNode[joinQuals.size()])); - if (joinFilters.size() > 0) { - this.joinFilter = AlgebraicUtil.createSingletonExprFromCNF(joinFilters.toArray(new EvalNode[joinFilters.size()])); - } else { - this.joinFilter = null; - } - - // HashJoin only can manage equi join key pairs. - this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), - rightChild.getSchema(), false); - - leftKeyList = new int[joinKeyPairs.size()]; - rightKeyList = new int[joinKeyPairs.size()]; - - for (int i = 0; i < joinKeyPairs.size(); i++) { - leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName()); - } - - for (int i = 0; i < joinKeyPairs.size(); i++) { - rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName()); - } - - // for projection - this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - - // for join - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); - leftKeyTuple = new VTuple(leftKeyList.length); - - rightNumCols = rightChild.getSchema().size(); - - joinQual.bind(context.getEvalContext(), inSchema); - if (joinFilter != null) { - joinFilter.bind(context.getEvalContext(), inSchema); - } + super(context, plan, leftChild, rightChild); } @Override - protected void compile() { - joinQual = context.getPrecompiledEval(inSchema, joinQual); - } - - protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) { - for (int i = 0; i < leftKeyList.length; i++) { - keyTuple.put(i, outerTuple.get(leftKeyList[i])); - } - } - public Tuple next() throws IOException { if (first) { loadRightToHashTable(); } - Tuple rightTuple; - boolean found = false; - - while(!context.isStopped() && !finished) { - - if (shouldGetLeftTuple) { // initially, it is true. - // getting new outer - leftTuple = leftChild.next(); // it comes from a disk - if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed. - finished = true; - return null; - } - - // getting corresponding right - getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple - List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple); - if (rightTuples != null) { // found right tuples on in-memory hash table. - iterator = rightTuples.iterator(); - shouldGetLeftTuple = false; - } else { - // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols); - frameTuple.set(leftTuple, nullPaddedTuple); - projector.eval(frameTuple, outTuple); - // we simulate we found a match, which is exactly the null padded one - shouldGetLeftTuple = true; - return outTuple; - } - } - - // getting a next right tuple on in-memory hash table. - rightTuple = iterator.next(); - if (!iterator.hasNext()) { // no more right tuples for this hash key - shouldGetLeftTuple = true; - } - - frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples - - // if there is no join filter, it is always true. - boolean satisfiedWithFilter = joinFilter == null || joinFilter.eval(frameTuple).isTrue(); - boolean satisfiedWithJoinCondition = joinQual.eval(frameTuple).isTrue(); - - // if a composited tuple satisfies with both join filter and join condition - if (satisfiedWithJoinCondition && satisfiedWithFilter) { - projector.eval(frameTuple, outTuple); - return outTuple; - } else { - - // if join filter is satisfied, the left outer join (LOJ) operator should return the null padded tuple - // only once. Then, LOJ operator should take the next left tuple. - if (!satisfiedWithFilter) { - shouldGetLeftTuple = true; - } - - // null padding - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols); - frameTuple.set(leftTuple, nullPaddedTuple); - + while (!context.isStopped() && !finished) { + if (iterator != null && iterator.hasNext()) { + frameTuple.setRight(iterator.next()); projector.eval(frameTuple, outTuple); return outTuple; } - } - - return outTuple; - } - - protected void loadRightToHashTable() throws IOException { - ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class); - if (scanExec.canBroadcast()) { - /* If this table can broadcast, all tasks in a node will share the same cache */ - TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey( - context, scanExec.getCanonicalName(), scanExec.getFragments()); - loadRightFromCache(key); - } else { - this.tupleSlots = buildRightToHashTable(); - } - - first = false; - } - - protected void loadRightFromCache(TableCacheKey key) throws IOException { - ExecutionBlockSharedResource sharedResource = context.getSharedResource(); - synchronized (sharedResource.getLock()) { - if (sharedResource.hasBroadcastCache(key)) { - CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); - this.tupleSlots = data.getData(); - this.cachedRightTableStats = data.getTableStats(); - } else { - CacheHolder.BroadcastCacheHolder holder = - new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null); - sharedResource.addBroadcastCache(key, holder); - CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); - this.tupleSlots = data.getData(); - this.cachedRightTableStats = data.getTableStats(); + Tuple leftTuple = leftChild.next(); // it comes from a disk + if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed. + finished = true; + return null; } - } - } + frameTuple.setLeft(leftTuple); - private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException { - Tuple tuple; - Tuple keyTuple; - Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000); - - while (!context.isStopped() && (tuple = rightChild.next()) != null) { - keyTuple = new VTuple(joinKeyPairs.size()); - for (int i = 0; i < rightKeyList.length; i++) { - keyTuple.put(i, tuple.get(rightKeyList[i])); + if (leftFiltered(leftTuple)) { + iterator = nullIterator(rightNumCols); + continue; } - List<Tuple> newValue = map.get(keyTuple); - - if (newValue != null) { - newValue.add(tuple); - } else { - newValue = new ArrayList<Tuple>(); - newValue.add(tuple); - map.put(keyTuple, newValue); + // getting corresponding right + List<Tuple> hashed = tupleSlots.get(toKey(leftTuple)); + Iterator<Tuple> rightTuples = rightFiltered(hashed); + if (!rightTuples.hasNext()) { + //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway + //output a tuple with the nulls padded rightTuple + iterator = nullIterator(rightNumCols); + continue; } + iterator = rightTuples; } - return map; - } - - @Override - public void rescan() throws IOException { - super.rescan(); - - tupleSlots.clear(); - first = true; - - finished = false; - iterator = null; - shouldGetLeftTuple = true; - } - - - @Override - public void close() throws IOException { - super.close(); - tupleSlots.clear(); - tupleSlots = null; - iterator = null; - plan = null; - joinQual = null; - joinFilter = null; - projector = null; - } - - public JoinNode getPlan() { - return this.plan; - } - - @Override - public TableStats getInputStats() { - if (leftChild == null) { - return inputStats; - } - TableStats leftInputStats = leftChild.getInputStats(); - inputStats.setNumBytes(0); - inputStats.setReadBytes(0); - inputStats.setNumRows(0); - - if (leftInputStats != null) { - inputStats.setNumBytes(leftInputStats.getNumBytes()); - inputStats.setReadBytes(leftInputStats.getReadBytes()); - inputStats.setNumRows(leftInputStats.getNumRows()); - } - - TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats; - if (rightInputStats != null) { - inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes()); - inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes()); - inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows()); - } - - return inputStats; + return null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java index 32e6d08..41e842a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java @@ -50,50 +50,34 @@ public class HashLeftSemiJoinExec extends HashJoinExec { * @return The tuple which is firstly matched to a given join condition. * @throws java.io.IOException */ + @Override public Tuple next() throws IOException { if (first) { loadRightToHashTable(); } - Tuple rightTuple; - boolean notFound; - while(!context.isStopped() && !finished) { - - // getting new outer - leftTuple = leftChild.next(); // it comes from a disk - if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed. - finished = true; - return null; + if (iterator != null && iterator.hasNext()) { + frameTuple.setRight(iterator.next()); + projector.eval(frameTuple, outTuple); + return outTuple; } - - // Try to find a hash bucket in in-memory hash table - getKeyLeftTuple(leftTuple, leftKeyTuple); - List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple); - if (rightTuples != null) { - // if found, it gets a hash bucket from the hash table. - iterator = rightTuples.iterator(); - } else { + // getting new outer + Tuple leftTuple = leftChild.next(); // it comes from a disk + if (leftTuple == null || leftFiltered(leftTuple)) { // if no more tuples in left tuples on disk, a join is completed. + finished = leftTuple == null; continue; } - // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket. - // If it finds any matched tuple, it returns the tuple immediately. - notFound = true; - while (notFound && iterator.hasNext()) { - rightTuple = iterator.next(); - frameTuple.set(leftTuple, rightTuple); - if (joinQual.eval(frameTuple).isTrue()) { // if the matched one is found - notFound = false; - projector.eval(frameTuple, outTuple); - } - } + frameTuple.setLeft(leftTuple); - if (!notFound) { // if there is no matched tuple - break; + // Try to find a hash bucket in in-memory hash table + List<Tuple> hashed = tupleSlots.get(toKey(leftTuple)); + if (hashed != null && rightFiltered(hashed).hasNext()) { + // if found, it gets a hash bucket from the hash table. + iterator = nullIterator(0); } } - - return outTuple; + return null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java deleted file mode 100644 index 735623d..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.planner.physical; - -import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.plan.logical.JoinNode; -import org.apache.tajo.storage.FrameTuple; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.worker.TaskAttemptContext; - -import java.io.IOException; - -public class NLLeftOuterJoinExec extends CommonJoinExec { - // temporal tuples and states for nested loop join - private boolean needNextRightTuple; - private FrameTuple frameTuple; - private Tuple leftTuple = null; - private Tuple rightTuple = null; - private Tuple outTuple = null; - - private boolean foundAtLeastOneMatch; - private int rightNumCols; - - public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, - PhysicalExec rightChild) { - super(context, plan, leftChild, rightChild); - // for join - needNextRightTuple = true; - frameTuple = new FrameTuple(); - outTuple = new VTuple(outSchema.size()); - - foundAtLeastOneMatch = false; - rightNumCols = rightChild.getSchema().size(); - } - - public Tuple next() throws IOException { - while (!context.isStopped()) { - if (needNextRightTuple) { - leftTuple = leftChild.next(); - if (leftTuple == null) { - return null; - } - needNextRightTuple = false; - // a new tuple from the left child has initially no matches on the right operand - foundAtLeastOneMatch = false; - } - rightTuple = rightChild.next(); - - if (rightTuple == null) { - // the scan of the right operand is finished with no matches found - if(foundAtLeastOneMatch == false){ - //output a tuple with the nulls padded rightTuple - Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols); - frameTuple.set(leftTuple, nullPaddedTuple); - projector.eval(frameTuple, outTuple); - // we simulate we found a match, which is exactly the null padded one - foundAtLeastOneMatch = true; - needNextRightTuple = true; - rightChild.rescan(); - return outTuple; - } else { - needNextRightTuple = true; - rightChild.rescan(); - continue; - } - } - - frameTuple.set(leftTuple, rightTuple); - ; - if (joinQual.eval(frameTuple).isTrue()) { - projector.eval(frameTuple, outTuple); - foundAtLeastOneMatch = true; - return outTuple; - } - } - return null; - } - - @Override - public void rescan() throws IOException { - super.rescan(); - needNextRightTuple = true; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java index 505b599..c4d90a5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java @@ -79,9 +79,6 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> { RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException; - RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack) - throws PhysicalPlanningException; - RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException; http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index 7abfbe6..fd825b1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -102,7 +102,6 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { * @throws IOException */ public Tuple next() throws IOException { - Tuple previous; while (!context.isStopped()) { boolean newRound = false; @@ -121,7 +120,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { // The finalizing stage, where remaining tuples on the only right are transformed into left-padded results if (end) { - if (initRightDone == false) { + if (!initRightDone) { // maybe the left operand was empty => the right one didn't have the chance to initialize rightTuple = rightChild.next(); initRightDone = true; @@ -160,18 +159,24 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { } } - if(rightTuple == null){ + if(rightTuple == null) { rightTuple = rightChild.next(); - - if(rightTuple != null){ - initRightDone = true; - } - else { + if (rightTuple == null) { initRightDone = true; end = true; continue; } } + if (rightFiltered(rightTuple)) { + Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols); + frameTuple.set(nullPaddedTuple, rightTuple); + projector.eval(frameTuple, outTuple); + + rightTuple = null; + return outTuple; + } + initRightDone = true; + ////////////////////////////////////////////////////////////////////// // END INITIALIZATION STAGE ////////////////////////////////////////////////////////////////////// @@ -203,10 +208,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { // we simulate we found a match, which is exactly the null padded one // BEFORE RETURN, MOVE FORWARD - rightTuple = rightChild.next(); - if(rightTuple == null) { - end = true; - } + rightTuple = null; return outTuple; } else if (cmp < 0) { @@ -223,6 +225,7 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { // END MOVE FORWARDING STAGE ////////////////////////////////////////////////////////////////////// + Tuple previous = null; // once a match is found, retain all tuples with this key in tuple slots on each side if(!end) { endInPopulationStage = false; @@ -257,6 +260,19 @@ public class RightOuterMergeJoinExec extends CommonJoinExec { endInPopulationStage = true; } } // if end false + if (previous != null && rightFiltered(previous)) { + Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols); + frameTuple.set(nullPaddedTuple, previous); + projector.eval(frameTuple, outTuple); + + // reset tuple slots for a new round + leftTupleSlots.clear(); + innerTupleSlots.clear(); + posRightTupleSlots = -1; + posLeftTupleSlots = -1; + + return outTuple; + } } // if newRound http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java index 6a5c0bf..addca49 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java @@ -18,7 +18,6 @@ package org.apache.tajo.engine.utils; -import com.google.common.collect.Maps; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.storage.Tuple; @@ -66,7 +65,7 @@ public interface CacheHolder<T> { @Override public Map<Tuple, List<Tuple>> getData() { - return Maps.newHashMap(data); + return data; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/36a703c5/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java index 95debd4..7210214 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java @@ -206,10 +206,10 @@ public class TestHashSemiJoinExec { // expect result without duplicated tuples. while ((tuple = exec.next()) != null) { count++; - assertTrue(i == tuple.get(0).asInt4()); - assertTrue(i == tuple.get(1).asInt4()); - assertTrue(("dept_" + i).equals(tuple.get(2).asChars())); - assertTrue(10 + i == tuple.get(3).asInt4()); + assertEquals(i, tuple.get(0).asInt4()); + assertEquals(i, tuple.get(1).asInt4()); + assertEquals("dept_" + i, tuple.get(2).asChars()); + assertEquals(10 + i, tuple.get(3).asInt4()); i += 2; }
