TAJO-1153: Merge off-heap package in block_iteration branch to master branch.
Closes #225 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/514ed847 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/514ed847 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/514ed847 Branch: refs/heads/hbase_storage Commit: 514ed8474a843c92b2fb772971e41f53010f5a4e Parents: 080f4e1 Author: Hyunsik Choi <[email protected]> Authored: Tue Nov 4 00:49:39 2014 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Tue Nov 4 00:49:39 2014 -0800 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/catalog/SchemaUtil.java | 103 ++++ .../java/org/apache/tajo/datum/TextDatum.java | 6 +- .../java/org/apache/tajo/storage/Tuple.java | 12 +- .../java/org/apache/tajo/storage/VTuple.java | 55 +- .../org/apache/tajo/util/Deallocatable.java | 23 + .../main/java/org/apache/tajo/util/SizeOf.java | 159 +++++ .../org/apache/tajo/util/UnsafeComparer.java | 160 +++++ .../java/org/apache/tajo/util/UnsafeUtil.java | 115 ++++ .../engine/planner/PhysicalPlannerImpl.java | 5 +- .../tajo/engine/planner/global/DataChannel.java | 2 +- .../planner/physical/ExternalSortExec.java | 130 ++++- .../planner/physical/HashFullOuterJoinExec.java | 2 +- .../engine/planner/physical/HashJoinExec.java | 2 +- .../planner/physical/HashLeftOuterJoinExec.java | 2 +- .../engine/planner/physical/MergeJoinExec.java | 2 +- .../planner/physical/PhysicalPlanUtil.java | 9 +- .../physical/RangeShuffleFileWriteExec.java | 2 +- .../physical/RightOuterMergeJoinExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 2 +- .../tajo/engine/planner/physical/SortExec.java | 7 +- .../engine/planner/physical/WindowAggExec.java | 5 +- .../tajo/worker/RangeRetrieverHandler.java | 6 +- .../main/java/org/apache/tajo/worker/Task.java | 3 +- .../apache/tajo/engine/eval/ExprTestBase.java | 2 +- .../tajo/engine/planner/TestPlannerUtil.java | 4 +- .../planner/TestUniformRangePartition.java | 4 +- .../planner/physical/TestBSTIndexExec.java | 4 +- .../planner/physical/TestExternalSortExec.java | 2 +- .../planner/physical/TestPhysicalPlanner.java | 2 +- .../physical/TestProgressExternalSortExec.java | 2 +- .../apache/tajo/engine/util/TestTupleUtil.java | 2 +- .../tajo/worker/TestRangeRetrieverHandler.java | 4 +- .../org/apache/tajo/jdbc/MetaDataTuple.java | 17 +- .../tajo/plan/LogicalPlanPreprocessor.java | 2 +- .../org/apache/tajo/plan/LogicalPlanner.java | 2 +- .../GreedyHeuristicJoinOrderAlgorithm.java | 2 +- .../org/apache/tajo/plan/logical/ScanNode.java | 2 +- .../tajo/plan/logical/TableSubQueryNode.java | 2 +- .../rewrite/rules/ProjectionPushDownRule.java | 2 +- .../org/apache/tajo/plan/util/PlannerUtil.java | 1 + .../org/apache/tajo/plan/util/SchemaUtil.java | 88 --- .../tajo/storage/BaseTupleComparator.java | 206 +++++++ .../org/apache/tajo/storage/FrameTuple.java | 14 +- .../java/org/apache/tajo/storage/LazyTuple.java | 13 +- .../org/apache/tajo/storage/RowStoreUtil.java | 202 +++++-- .../apache/tajo/storage/TupleComparator.java | 159 +---- .../org/apache/tajo/storage/TupleRange.java | 2 +- .../apache/tajo/storage/index/IndexMethod.java | 1 + .../apache/tajo/storage/index/bst/BSTIndex.java | 10 +- .../org/apache/tajo/tuple/BaseTupleBuilder.java | 112 ++++ .../org/apache/tajo/tuple/RowBlockReader.java | 33 ++ .../org/apache/tajo/tuple/TupleBuilder.java | 26 + .../tajo/tuple/offheap/DirectBufTuple.java | 41 ++ .../tajo/tuple/offheap/FixedSizeLimitSpec.java | 32 + .../apache/tajo/tuple/offheap/HeapTuple.java | 269 +++++++++ .../tajo/tuple/offheap/OffHeapMemory.java | 102 ++++ .../tajo/tuple/offheap/OffHeapRowBlock.java | 176 ++++++ .../tuple/offheap/OffHeapRowBlockReader.java | 63 ++ .../tuple/offheap/OffHeapRowBlockUtils.java | 54 ++ .../tuple/offheap/OffHeapRowBlockWriter.java | 58 ++ .../tajo/tuple/offheap/OffHeapRowWriter.java | 232 ++++++++ .../tajo/tuple/offheap/ResizableLimitSpec.java | 142 +++++ .../apache/tajo/tuple/offheap/RowWriter.java | 73 +++ .../apache/tajo/tuple/offheap/UnSafeTuple.java | 308 ++++++++++ .../offheap/UnSafeTupleBytesComparator.java | 99 ++++ .../tajo/tuple/offheap/ZeroCopyTuple.java | 35 ++ tajo-storage/src/main/proto/IndexProtos.proto | 4 +- .../tajo/storage/TestTupleComparator.java | 2 +- .../apache/tajo/storage/index/TestBSTIndex.java | 20 +- .../index/TestSingleCSVFileBSTIndex.java | 4 +- .../apache/tajo/tuple/TestBaseTupleBuilder.java | 76 +++ .../tajo/tuple/offheap/TestHeapTuple.java | 45 ++ .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 577 +++++++++++++++++++ .../tajo/tuple/offheap/TestResizableSpec.java | 59 ++ 75 files changed, 3789 insertions(+), 421 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 943869b..19dc1f6 100644 --- a/CHANGES +++ b/CHANGES @@ -44,6 +44,9 @@ Release 0.9.1 - unreleased TASKS + TAJO-1153: Merge off-heap package in block_iteration branch to master + branch. (hyunsik) + TAJO-1032: Improve TravisCI scripts to adjust log4j log level. (jinho) TAJO-1141: Refactor the packages hierarchy of tajo-client. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java new file mode 100644 index 0000000..ee670ef --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog; + +import static org.apache.tajo.common.TajoDataTypes.DataType; +import static org.apache.tajo.common.TajoDataTypes.Type; + +public class SchemaUtil { + // See TAJO-914 bug. + // + // Its essential problem is that constant value is evaluated multiple times at each scan. + // As a result, join nodes can take the child nodes which have the same named fields. + // Because current schema does not allow the same name and ignore the duplicated schema, + // it finally causes the in-out schema mismatch between the parent and child nodes. + // + // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields. + // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895. + static int tmpColumnSeq = 0; + public static Schema merge(Schema left, Schema right) { + Schema merged = new Schema(); + for(Column col : left.getColumns()) { + if (!merged.containsByQualifiedName(col.getQualifiedName())) { + merged.addColumn(col); + } + } + for(Column col : right.getColumns()) { + if (merged.containsByQualifiedName(col.getQualifiedName())) { + merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType()); + } else { + merged.addColumn(col); + } + } + + // if overflow + if (tmpColumnSeq < 0) { + tmpColumnSeq = 0; + } + return merged; + } + + /** + * Get common columns to be used as join keys of natural joins. + */ + public static Schema getNaturalJoinColumns(Schema left, Schema right) { + Schema common = new Schema(); + for (Column outer : left.getColumns()) { + if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) { + common.addColumn(new Column(outer.getSimpleName(), outer.getDataType())); + } + } + + return common; + } + + public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) { + Schema logicalSchema = new Schema(tableDesc.getLogicalSchema()); + if (tableName != null) { + logicalSchema.setQualifier(tableName); + } + return logicalSchema; + } + + public static <T extends Schema> T clone(Schema schema) { + try { + T copy = (T) schema.clone(); + return copy; + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } + + public static DataType [] toDataTypes(Schema schema) { + DataType[] types = new DataType[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + types[i] = schema.getColumn(i).getDataType(); + } + return types; + } + + public static Type [] toTypes(Schema schema) { + Type [] types = new Type[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + types[i] = schema.getColumn(i).getDataType().getType(); + } + return types; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java index ca76ed2..9a9b37e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java @@ -30,7 +30,7 @@ import java.nio.charset.Charset; import java.util.Comparator; public class TextDatum extends Datum { - static Charset defaultCharset = Charset.forName("UTF-8"); + public static Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); @Expose private final int size; /* encoded in UTF-8 */ @@ -47,7 +47,7 @@ public class TextDatum extends Datum { } public TextDatum(String string) { - this(string.getBytes(defaultCharset)); + this(string.getBytes(DEFAULT_CHARSET)); } @Override @@ -92,7 +92,7 @@ public class TextDatum extends Datum { @Override public String asChars() { - return new String(this.bytes, defaultCharset); + return new String(this.bytes, DEFAULT_CHARSET); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java index c183171..1ba1926 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java @@ -19,7 +19,6 @@ package org.apache.tajo.storage; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.ProtobufDatum; public interface Tuple extends Cloneable { @@ -28,16 +27,19 @@ public interface Tuple extends Cloneable { public boolean contains(int fieldid); public boolean isNull(int fieldid); + + @SuppressWarnings("unused") + public boolean isNotNull(int fieldid); public void clear(); public void put(int fieldId, Datum value); - public void put(int fieldId, Datum [] values); + public void put(int fieldId, Datum[] values); public void put(int fieldId, Tuple tuple); - public void put(Datum [] values); + public void put(Datum[] values); public Datum get(int fieldId); @@ -65,7 +67,9 @@ public interface Tuple extends Cloneable { public String getText(int fieldId); - public ProtobufDatum getProtobufDatum(int fieldId); + public Datum getProtobufDatum(int fieldId); + + public Datum getInterval(int fieldId); public char [] getUnicodeChars(int fieldId); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java index 4fb35f9..6304734 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java @@ -21,7 +21,7 @@ package org.apache.tajo.storage; import com.google.gson.annotations.Expose; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.Inet4Datum; -import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnimplementedException; @@ -38,10 +38,9 @@ public class VTuple implements Tuple, Cloneable { public VTuple(Tuple tuple) { this.values = tuple.getValues().clone(); - this.offset = ((VTuple)tuple).offset; } - public VTuple(Datum [] datum) { + public VTuple(Datum[] datum) { this(datum.length); put(datum); } @@ -57,7 +56,12 @@ public class VTuple implements Tuple, Cloneable { @Override public boolean isNull(int fieldid) { - return values[fieldid] instanceof NullDatum; + return values[fieldid].isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -179,6 +183,11 @@ public class VTuple implements Tuple, Cloneable { } @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) values[fieldId]; + } + + @Override public char[] getUnicodeChars(int fieldId) { return values[fieldId].asUnicodeChars(); } @@ -193,23 +202,7 @@ public class VTuple implements Tuple, Cloneable { } public String toString() { - boolean first = true; - StringBuilder str = new StringBuilder(); - str.append("("); - for(int i=0; i < values.length; i++) { - if(values[i] != null) { - if(first) { - first = false; - } else { - str.append(", "); - } - str.append(i) - .append("=>") - .append(values[i]); - } - } - str.append(")"); - return str.toString(); + return toDisplayString(getValues()); } @Override @@ -230,4 +223,24 @@ public class VTuple implements Tuple, Cloneable { } return false; } + + public static String toDisplayString(Datum [] values) { + boolean first = true; + StringBuilder str = new StringBuilder(); + str.append("("); + for(int i=0; i < values.length; i++) { + if(values[i] != null) { + if(first) { + first = false; + } else { + str.append(", "); + } + str.append(i) + .append("=>") + .append(values[i]); + } + } + str.append(")"); + return str.toString(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java b/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java new file mode 100644 index 0000000..8202a4b --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +public interface Deallocatable { + void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java b/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java new file mode 100644 index 0000000..b7ca10c --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java @@ -0,0 +1,159 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import static org.apache.tajo.util.UnsafeUtil.*; + + +public final class SizeOf { + + public static final int BITS_PER_BYTE = 8; + public static final int BYTES_PER_WORD = SizeOf.SIZE_OF_LONG; + public static final int BITS_PER_WORD = SizeOf.SIZE_OF_LONG * BITS_PER_BYTE; + + public static final byte SIZE_OF_BOOL = 1; + public static final byte SIZE_OF_BYTE = 1; + public static final byte SIZE_OF_SHORT = 2; + public static final byte SIZE_OF_INT = 4; + public static final byte SIZE_OF_LONG = 8; + public static final byte SIZE_OF_FLOAT = 4; + public static final byte SIZE_OF_DOUBLE = 8; + + public static long sizeOf(boolean[] array) + { + if (array == null) { + return 0; + } + return ARRAY_BOOLEAN_BASE_OFFSET + (((long) ARRAY_BOOLEAN_INDEX_SCALE) * array.length); + } + + public static long sizeOf(byte[] array) + { + if (array == null) { + return 0; + } + return ARRAY_BYTE_BASE_OFFSET + (((long) ARRAY_BYTE_INDEX_SCALE) * array.length); + } + + public static long sizeOf(short[] array) + { + if (array == null) { + return 0; + } + return ARRAY_SHORT_BASE_OFFSET + (((long) ARRAY_SHORT_INDEX_SCALE) * array.length); + } + + public static long sizeOf(char[] array) + { + if (array == null) { + return 0; + } + return ARRAY_CHAR_BASE_OFFSET + (((long) ARRAY_CHAR_INDEX_SCALE) * array.length); + } + + public static long sizeOf(int[] array) + { + if (array == null) { + return 0; + } + return ARRAY_INT_BASE_OFFSET + (((long) ARRAY_INT_INDEX_SCALE) * array.length); + } + + public static long sizeOf(long[] array) + { + if (array == null) { + return 0; + } + return ARRAY_LONG_BASE_OFFSET + (((long) ARRAY_LONG_INDEX_SCALE) * array.length); + } + + public static long sizeOf(float[] array) + { + if (array == null) { + return 0; + } + return ARRAY_FLOAT_BASE_OFFSET + (((long) ARRAY_FLOAT_INDEX_SCALE) * array.length); + } + + public static long sizeOf(double[] array) + { + if (array == null) { + return 0; + } + return ARRAY_DOUBLE_BASE_OFFSET + (((long) ARRAY_DOUBLE_INDEX_SCALE) * array.length); + } + + public static long sizeOf(Object[] array) + { + if (array == null) { + return 0; + } + return ARRAY_OBJECT_BASE_OFFSET + (((long) ARRAY_OBJECT_INDEX_SCALE) * array.length); + } + + + public static long sizeOfBooleanArray(int length) + { + return ARRAY_BOOLEAN_BASE_OFFSET + (((long) ARRAY_BOOLEAN_INDEX_SCALE) * length); + } + + public static long sizeOfByteArray(int length) + { + return ARRAY_BYTE_BASE_OFFSET + (((long) ARRAY_BYTE_INDEX_SCALE) * length); + } + + public static long sizeOfShortArray(int length) + { + return ARRAY_SHORT_BASE_OFFSET + (((long) ARRAY_SHORT_INDEX_SCALE) * length); + } + + public static long sizeOfCharArray(int length) + { + return ARRAY_CHAR_BASE_OFFSET + (((long) ARRAY_CHAR_INDEX_SCALE) * length); + } + + public static long sizeOfIntArray(int length) + { + return ARRAY_INT_BASE_OFFSET + (((long) ARRAY_INT_INDEX_SCALE) * length); + } + + public static long sizeOfLongArray(int length) + { + return ARRAY_LONG_BASE_OFFSET + (((long) ARRAY_LONG_INDEX_SCALE) * length); + } + + public static long sizeOfFloatArray(int length) + { + return ARRAY_FLOAT_BASE_OFFSET + (((long) ARRAY_FLOAT_INDEX_SCALE) * length); + } + + public static long sizeOfDoubleArray(int length) + { + return ARRAY_DOUBLE_BASE_OFFSET + (((long) ARRAY_DOUBLE_INDEX_SCALE) * length); + } + + public static long sizeOfObjectArray(int length) + { + return ARRAY_OBJECT_BASE_OFFSET + (((long) ARRAY_OBJECT_INDEX_SCALE) * length); + } + + private SizeOf() + { + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java new file mode 100644 index 0000000..fec4e5a --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java @@ -0,0 +1,160 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedBytes; +import com.google.common.primitives.UnsignedLongs; +import sun.misc.Unsafe; + +import java.lang.reflect.Field; +import java.nio.ByteOrder; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Comparator; + +/** + * This is borrowed from Guava's UnsignedBytes. + */ +public class UnsafeComparer implements Comparator<byte[]> { + + public static final UnsafeComparer INSTANCE; + + static { + INSTANCE = new UnsafeComparer(); + } + + public Comparator<byte []> get() { + return INSTANCE; + } + + private UnsafeComparer() {} + + static final boolean littleEndian = + ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); + + /* + * The following static final fields exist for performance reasons. + * + * In UnsignedBytesBenchmark, accessing the following objects via static + * final fields is the fastest (more than twice as fast as the Java + * implementation, vs ~1.5x with non-final static fields, on x86_32) + * under the Hotspot server compiler. The reason is obviously that the + * non-final fields need to be reloaded inside the loop. + * + * And, no, defining (final or not) local variables out of the loop still + * isn't as good because the null check on the theUnsafe object remains + * inside the loop and BYTE_ARRAY_BASE_OFFSET doesn't get + * constant-folded. + * + * The compiler can treat static final fields as compile-time constants + * and can constant-fold them while (final or not) local variables are + * run time values. + */ + + static final Unsafe theUnsafe; + + /** The offset to the first element in a byte array. */ + static final int BYTE_ARRAY_BASE_OFFSET; + + static { + theUnsafe = (Unsafe) AccessController.doPrivileged( + new PrivilegedAction<Object>() { + @Override + public Object run() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return f.get(null); + } catch (NoSuchFieldException e) { + // It doesn't matter what we throw; + // it's swallowed in getBestComparator(). + throw new Error(); + } catch (IllegalAccessException e) { + throw new Error(); + } + } + }); + + BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class); + + // sanity check - this should never fail + if (theUnsafe.arrayIndexScale(byte[].class) != 1) { + throw new AssertionError(); + } + } + + @SuppressWarnings("unused") + public static int compareStatic(byte[] left, byte[] right) { + return INSTANCE.compare(left, right); + } + + @Override public int compare(byte[] left, byte[] right) { + int minLength = Math.min(left.length, right.length); + int minWords = minLength / Longs.BYTES; + + /* + * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a + * time is no slower than comparing 4 bytes at a time even on 32-bit. + * On the other hand, it is substantially faster on 64-bit. + */ + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = theUnsafe.getLong(left, BYTE_ARRAY_BASE_OFFSET + (long) i); + long rw = theUnsafe.getLong(right, BYTE_ARRAY_BASE_OFFSET + (long) i); + long diff = lw ^ rw; + + if (diff != 0) { + if (!littleEndian) { + return UnsignedLongs.compare(lw, rw); + } + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + if (x == 0) { + x = (int) (diff >>> 32); + n = 32; + } + + y = x << 16; + if (y == 0) { + n += 16; + } else { + x = y; + } + + y = x << 8; + if (y == 0) { + n += 8; + } + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int result = UnsignedBytes.compare(left[i], right[i]); + if (result != 0) { + return result; + } + } + return left.length - right.length; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java new file mode 100644 index 0000000..ae88f73 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java @@ -0,0 +1,115 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.util; + +import com.google.common.base.Preconditions; +import sun.misc.Cleaner; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +public class UnsafeUtil { + public static final Unsafe unsafe; + + // offsets + public static final int ARRAY_BOOLEAN_BASE_OFFSET; + public static final int ARRAY_BYTE_BASE_OFFSET; + public static final int ARRAY_SHORT_BASE_OFFSET; + public static final int ARRAY_CHAR_BASE_OFFSET; + public static final int ARRAY_INT_BASE_OFFSET; + public static final int ARRAY_LONG_BASE_OFFSET; + public static final int ARRAY_FLOAT_BASE_OFFSET; + public static final int ARRAY_DOUBLE_BASE_OFFSET; + public static final int ARRAY_OBJECT_BASE_OFFSET; + + // scale + public static final int ARRAY_BOOLEAN_INDEX_SCALE; + public static final int ARRAY_BYTE_INDEX_SCALE; + public static final int ARRAY_SHORT_INDEX_SCALE; + public static final int ARRAY_CHAR_INDEX_SCALE; + public static final int ARRAY_INT_INDEX_SCALE; + public static final int ARRAY_LONG_INDEX_SCALE; + public static final int ARRAY_FLOAT_INDEX_SCALE; + public static final int ARRAY_DOUBLE_INDEX_SCALE; + public static final int ARRAY_OBJECT_INDEX_SCALE; + + static { + Field field; + try { + field = Unsafe.class.getDeclaredField("theUnsafe"); + + field.setAccessible(true); + unsafe = (Unsafe) field.get(null); + if (unsafe == null) { + throw new RuntimeException("Unsafe access not available"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + ARRAY_BOOLEAN_BASE_OFFSET = unsafe.arrayBaseOffset(boolean[].class); + ARRAY_BYTE_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class); + ARRAY_SHORT_BASE_OFFSET = unsafe.arrayBaseOffset(short[].class); + ARRAY_CHAR_BASE_OFFSET = unsafe.arrayBaseOffset(char[].class); + ARRAY_INT_BASE_OFFSET = unsafe.arrayBaseOffset(int[].class); + ARRAY_LONG_BASE_OFFSET = unsafe.arrayBaseOffset(long[].class); + ARRAY_FLOAT_BASE_OFFSET = unsafe.arrayBaseOffset(float[].class); + ARRAY_DOUBLE_BASE_OFFSET = unsafe.arrayBaseOffset(double[].class); + ARRAY_OBJECT_BASE_OFFSET = unsafe.arrayBaseOffset(Object[].class); + + ARRAY_BOOLEAN_INDEX_SCALE = unsafe.arrayIndexScale(boolean[].class); + ARRAY_BYTE_INDEX_SCALE = unsafe.arrayIndexScale(byte[].class); + ARRAY_SHORT_INDEX_SCALE = unsafe.arrayIndexScale(short[].class); + ARRAY_CHAR_INDEX_SCALE = unsafe.arrayIndexScale(char[].class); + ARRAY_INT_INDEX_SCALE = unsafe.arrayIndexScale(int[].class); + ARRAY_LONG_INDEX_SCALE = unsafe.arrayIndexScale(long[].class); + ARRAY_FLOAT_INDEX_SCALE = unsafe.arrayIndexScale(float[].class); + ARRAY_DOUBLE_INDEX_SCALE = unsafe.arrayIndexScale(double[].class); + ARRAY_OBJECT_INDEX_SCALE = unsafe.arrayIndexScale(Object[].class); + } + + public static int alignedSize(int size) { + int remain = size % SizeOf.BYTES_PER_WORD; + if (remain > 0) { + return size + (SizeOf.BYTES_PER_WORD - remain); + } else { + return size; + } + } + + public static long getAddress(ByteBuffer buffer) { + Preconditions.checkArgument(buffer instanceof DirectBuffer, "ByteBuffer must be DirectBuffer"); + return ((DirectBuffer)buffer).address(); + } + + public static void free(Deallocatable obj) { + obj.release(); + } + + public static void free(ByteBuffer bb) { + Preconditions.checkNotNull(bb); + Preconditions.checkState(bb instanceof DirectBuffer); + + Cleaner cleaner = ((DirectBuffer) bb).cleaner(); + if (cleaner != null) { + cleaner.clean(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index f7f071f..99b79ec 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -45,8 +45,9 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAg import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.TupleComparator; @@ -1185,7 +1186,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys()); Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index"); - TupleComparator comp = new TupleComparator(annotation.getKeySchema(), + TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(), annotation.getSortKeys()); return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName), annotation.getKeySchema(), comp, annotation.getDatum()); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java index 4a2cd19..11548d3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java @@ -22,7 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.util.TUtil; import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 4c0caea..121e6bd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -529,9 +529,9 @@ public class ExternalSortExec extends SortExec { throws IOException { if (num > 1) { final int mid = (int) Math.ceil((float)num / 2); - return new PairWiseMerger( + return new PairWiseMerger(inSchema, createKWayMergerInternal(sources, startIdx, mid), - createKWayMergerInternal(sources, startIdx + mid, num - mid)); + createKWayMergerInternal(sources, startIdx + mid, num - mid), getComparator()); } else { return sources[startIdx]; } @@ -626,6 +626,12 @@ public class ExternalSortExec extends SortExec { } } + enum State { + NEW, + INITED, + CLOSED + } + /** * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order. */ @@ -633,59 +639,132 @@ public class ExternalSortExec extends SortExec { private Scanner leftScan; private Scanner rightScan; - private Tuple leftTuple; - private Tuple rightTuple; + private VTuple outTuple; + private VTuple leftTuple; + private VTuple rightTuple; - private final Comparator<Tuple> comparator = getComparator(); + private final Schema schema; + private final Comparator<Tuple> comparator; private float mergerProgress; private TableStats mergerInputStats; - public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws IOException { + private State state = State.NEW; + + public PairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner, Comparator<Tuple> comparator) + throws IOException { + this.schema = schema; this.leftScan = leftScanner; this.rightScan = rightScanner; + this.comparator = comparator; + } + + private void setState(State state) { + this.state = state; } @Override public void init() throws IOException { - leftScan.init(); - rightScan.init(); + if (state == State.NEW) { + leftScan.init(); + rightScan.init(); + + prepareTuplesForFirstComparison(); + + mergerInputStats = new TableStats(); + mergerProgress = 0.0f; + + setState(State.INITED); + } else { + throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name()); + } + } - leftTuple = leftScan.next(); - rightTuple = rightScan.next(); + private void prepareTuplesForFirstComparison() throws IOException { + Tuple lt = leftScan.next(); + if (lt != null) { + leftTuple = new VTuple(lt); + } else { + leftTuple = null; // TODO - missed free + } - mergerInputStats = new TableStats(); - mergerProgress = 0.0f; + Tuple rt = rightScan.next(); + if (rt != null) { + rightTuple = new VTuple(rt); + } else { + rightTuple = null; // TODO - missed free + } } public Tuple next() throws IOException { - Tuple outTuple; + if (leftTuple != null && rightTuple != null) { if (comparator.compare(leftTuple, rightTuple) < 0) { - outTuple = leftTuple; - leftTuple = leftScan.next(); + outTuple = new VTuple(leftTuple); + + Tuple lt = leftScan.next(); + if (lt != null) { + leftTuple = new VTuple(lt); + } else { + leftTuple = null; // TODO - missed free + } } else { - outTuple = rightTuple; - rightTuple = rightScan.next(); + outTuple = new VTuple(rightTuple); + + Tuple rt = rightScan.next(); + if (rt != null) { + rightTuple = new VTuple(rt); + } else { + rightTuple = null; // TODO - missed free + } } return outTuple; } if (leftTuple == null) { - outTuple = rightTuple; - rightTuple = rightScan.next(); + if (rightTuple != null) { + outTuple = new VTuple(rightTuple); + } else { + outTuple = null; + } + + Tuple rt = rightScan.next(); + if (rt != null) { + rightTuple = new VTuple(rt); + } else { + rightTuple = null; // TODO - missed free + } } else { - outTuple = leftTuple; - leftTuple = leftScan.next(); + if (leftTuple != null) { + outTuple = new VTuple(leftTuple); + } else { + outTuple = null; + } + + Tuple lt = leftScan.next(); + if (lt != null) { + leftTuple = new VTuple(lt); + } else { + leftTuple = null; // TODO - missed free + } } return outTuple; } @Override public void reset() throws IOException { - leftScan.reset(); - rightScan.reset(); - init(); + if (state == State.INITED) { + leftScan.reset(); + rightScan.reset(); + + outTuple = null; + leftTuple = null; + rightTuple = null; + + prepareTuplesForFirstComparison(); + } else { + throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name()); + } } public void close() throws IOException { @@ -694,6 +773,7 @@ public class ExternalSortExec extends SortExec { leftScan = null; rightScan = null; mergerProgress = 1.0f; + setState(State.CLOSED); } @Override @@ -721,7 +801,7 @@ public class ExternalSortExec extends SortExec { @Override public Schema getSchema() { - return inSchema; + return schema; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index cf333b0..28d9a3e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -23,7 +23,7 @@ import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index 18be6b9..701297f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -22,7 +22,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index c38be88..c1b6522 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -25,7 +25,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.expr.AlgebraicUtil; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.expr.EvalTreeUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index 20df210..13104ee 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -52,7 +52,7 @@ public class MergeJoinExec extends BinaryPhysicalExec { private Iterator<Tuple> innerIterator; private JoinTupleComparator joincomparator = null; - private TupleComparator[] tupleComparator = null; + private TupleComparator [] tupleComparator = null; private final static int INITIAL_TUPLE_SLOT = 10000; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index 1b824b5..ec5915f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -34,6 +34,7 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.PersistentStoreNode; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.TupleComparator; @@ -53,11 +54,11 @@ public class PhysicalPlanUtil { return (T) new FindVisitor().visit(plan, new Stack<PhysicalExec>(), clazz); } - public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema leftSchema, Schema rightSchema) { + public static TupleComparator [] getComparatorsFromJoinQual(EvalNode joinQual, Schema leftSchema, Schema rightSchema) { SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(joinQual, leftSchema, rightSchema); - TupleComparator[] comparators = new TupleComparator[2]; - comparators[0] = new TupleComparator(leftSchema, sortSpecs[0]); - comparators[1] = new TupleComparator(rightSchema, sortSpecs[1]); + BaseTupleComparator[] comparators = new BaseTupleComparator[2]; + comparators[0] = new BaseTupleComparator(leftSchema, sortSpecs[0]); + comparators[1] = new BaseTupleComparator(rightSchema, sortSpecs[1]); return comparators; } http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index dd72910..79993da 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -71,7 +71,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { } BSTIndex bst = new BSTIndex(new TajoConf()); - this.comp = new TupleComparator(keySchema, sortSpecs); + this.comp = new BaseTupleComparator(keySchema, sortSpecs); Path storeTablePath = new Path(context.getWorkDir(), "output"); LOG.info("Output data directory: " + storeTablePath); this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ? http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index 9d8122f..a02d00b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -51,7 +51,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { private List<Tuple> innerTupleSlots; private JoinTupleComparator joinComparator = null; - private TupleComparator[] tupleComparator = null; + private TupleComparator [] tupleComparator = null; private final static int INITIAL_TUPLE_SLOT = 10000; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index c6a4f55..3cbb7c9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -30,7 +30,7 @@ import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.utils.TupleCache; import org.apache.tajo.engine.utils.TupleCacheKey; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.ConstEval; import org.apache.tajo.plan.expr.EvalNode; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java index a4a8d37..c0703dd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java @@ -18,24 +18,25 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.Comparator; public abstract class SortExec extends UnaryPhysicalExec { - private final Comparator<Tuple> comparator; + private final TupleComparator comparator; private final SortSpec [] sortSpecs; public SortExec(TaskAttemptContext context, Schema inSchema, Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) { super(context, inSchema, outSchema, child); this.sortSpecs = sortSpecs; - this.comparator = new TupleComparator(inSchema, sortSpecs); + this.comparator = new BaseTupleComparator(inSchema, sortSpecs); } public SortSpec[] getSortSpecs() { http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java index 3f4b22b..e36dcd8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java @@ -27,6 +27,7 @@ import org.apache.tajo.plan.expr.WindowFunctionEval; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.logical.WindowAggNode; import org.apache.tajo.plan.logical.WindowSpec; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; @@ -285,9 +286,9 @@ public class WindowAggExec extends UnaryPhysicalExec { for (int idx = 0; idx < functions.length; idx++) { if (orderedFuncFlags[idx]) { - comp = new TupleComparator(inSchema, functions[idx].getSortSpecs()); + comp = new BaseTupleComparator(inSchema, functions[idx].getSortSpecs()); Collections.sort(accumulatedInTuples, comp); - comp = new TupleComparator(schemaForOrderBy, functions[idx].getSortSpecs()); + comp = new BaseTupleComparator(schemaForOrderBy, functions[idx].getSortSpecs()); Collections.sort(evaluatedTuples, comp); } http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java index be33a12..2b58196 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java @@ -26,10 +26,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.worker.dataserver.retriever.FileChunk; import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler; @@ -57,10 +57,10 @@ public class RangeRetrieverHandler implements RetrieverHandler { private final File file; private final BSTIndex.BSTIndexReader idxReader; private final Schema schema; - private final TupleComparator comp; + private final BaseTupleComparator comp; private final RowStoreDecoder decoder; - public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException { + public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator comp) throws IOException { this.file = outDir; BSTIndex index = new BSTIndex(new TajoConf()); this.schema = schema; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 3858c96..a7b52c7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -52,6 +52,7 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TupleComparator; @@ -185,7 +186,7 @@ public class Task { if (shuffleType == ShuffleType.RANGE_SHUFFLE) { SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); - this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys()); + this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { // The final result of a task will be written in a file named part-ss-nnnnnnn, http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 4287191..73d89ec 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -41,7 +41,7 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.serder.EvalTreeProtoDeserializer; import org.apache.tajo.plan.serder.EvalTreeProtoSerializer; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java index 90ec9b1..5cc89b8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java @@ -36,10 +36,10 @@ import org.apache.tajo.engine.function.builtin.SumInt; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil; import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; @@ -299,7 +299,7 @@ public class TestPlannerUtil { FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4)); EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2); - TupleComparator [] comparators = PhysicalPlanUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema); + TupleComparator[] comparators = PhysicalPlanUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema); Tuple t1 = new VTuple(2); t1.put(0, DatumFactory.createInt4(1)); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java index b044384..c5ef4ef 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java @@ -23,8 +23,8 @@ import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; import org.junit.Test; @@ -484,7 +484,7 @@ public class TestUniformRangePartition { TupleRange expected = new TupleRange(sortSpecs, s, e); UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); - TupleComparator comp = new TupleComparator(schema, sortSpecs); + BaseTupleComparator comp = new BaseTupleComparator(schema, sortSpecs); Tuple tuple = s; Tuple prevTuple = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index 868a82e..c7fbf39 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -68,7 +68,7 @@ public class TestBSTIndexExec { private LogicalOptimizer optimizer; private StorageManager sm; private Schema idxSchema; - private TupleComparator comp; + private BaseTupleComparator comp; private BSTIndex.BSTIndexWriter writer; private HashMap<Integer , Integer> randomValues ; private int rndKey = -1; @@ -104,7 +104,7 @@ public class TestBSTIndexExec { idxSchema.addColumn("managerid", Type.INT4); SortSpec[] sortKeys = new SortSpec[1]; sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false); - this.comp = new TupleComparator(idxSchema, sortKeys); + this.comp = new BaseTupleComparator(idxSchema, sortKeys); this.writer = new BSTIndex(conf).getIndexWriter(idxPath, BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index 46f55c6..f09d104 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -152,7 +152,7 @@ public class TestExternalSortExec { int cnt = 0; exec.init(); long start = System.currentTimeMillis(); - TupleComparator comparator = new TupleComparator(proj.getSchema(), + BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), new SortSpec[]{ new SortSpec(new Column("managerid", Type.INT4)), new SortSpec(new Column("empid", Type.INT4)) http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 341ce9e..42338a8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -1073,7 +1073,7 @@ public class TestPhysicalPlanner { keySchema.addColumn("?empId", Type.INT4); SortSpec[] sortSpec = new SortSpec[1]; sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false); - TupleComparator comp = new TupleComparator(keySchema, sortSpec); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec); BSTIndex bst = new BSTIndex(conf); BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"), keySchema, comp); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java index a02f2ce..9da9dee 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -170,7 +170,7 @@ public class TestProgressExternalSortExec { Tuple curVal; int cnt = 0; exec.init(); - TupleComparator comparator = new TupleComparator(proj.getSchema(), + BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), new SortSpec[]{ new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)), new SortSpec(new Column("empid", TajoDataTypes.Type.INT4)) http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java index 62c959c..b8114e0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java @@ -112,7 +112,7 @@ public class TestTupleUtil { sortSpecs); TupleRange [] ranges = partitioner.partition(5); assertTrue(5 <= ranges.length); - TupleComparator comp = new TupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema)); + BaseTupleComparator comp = new BaseTupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema)); TupleRange prev = ranges[0]; for (int i = 1; i < ranges.length; i++) { assertTrue(comp.compare(prev.getStart(), ranges[i].getStart()) < 0); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java index 124976a..e35b80c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java @@ -178,7 +178,7 @@ public class TestRangeRetrieverHandler { exec.close(); Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); - TupleComparator comp = new TupleComparator(keySchema, sortSpecs); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs); BSTIndex bst = new BSTIndex(conf); BSTIndex.BSTIndexReader reader = bst.getIndexReader( new Path(testDir, "output/index"), keySchema, comp); @@ -302,7 +302,7 @@ public class TestRangeRetrieverHandler { exec.close(); Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); - TupleComparator comp = new TupleComparator(keySchema, sortSpecs); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs); BSTIndex bst = new BSTIndex(conf); BSTIndex.BSTIndexReader reader = bst.getIndexReader( new Path(testDir, "output/index"), keySchema, comp); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java index 5dae67e..6c8ef5d 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java @@ -17,6 +17,7 @@ package org.apache.tajo.jdbc; /** */ import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; @@ -47,7 +48,12 @@ public class MetaDataTuple implements Tuple { @Override public boolean isNull(int fieldid) { - return values.get(fieldid) == null || values.get(fieldid) instanceof NullDatum; + return values.get(fieldid) == null || values.get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -142,7 +148,12 @@ public class MetaDataTuple implements Tuple { @Override public ProtobufDatum getProtobufDatum(int fieldId) { - throw new UnsupportedException(); + throw new UnsupportedException("getProtobufDatum"); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + throw new UnsupportedException("getInterval"); } @Override @@ -157,6 +168,6 @@ public class MetaDataTuple implements Tuple { @Override public Datum[] getValues(){ - throw new UnsupportedException(); + throw new UnsupportedException("getValues"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java index 6bdfb9d..68f2186 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java @@ -32,7 +32,7 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.nameresolver.NameResolver; import org.apache.tajo.plan.nameresolver.NameResolvingMode; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor; import org.apache.tajo.util.TUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index 84942a1..e953ccf 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -47,7 +47,7 @@ import org.apache.tajo.plan.nameresolver.NameResolvingMode; import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule; import org.apache.tajo.plan.util.ExprFinder; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.Pair; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java index a916d5c..0fad8a8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java @@ -23,7 +23,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.PlanningException; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.expr.AlgebraicUtil; import org.apache.tajo.plan.logical.*; import org.apache.tajo.util.TUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java index a7ef543..3e4abe3 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java @@ -25,7 +25,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.plan.PlanString; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.util.TUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java index c8fbecd..4e5f41c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java @@ -23,7 +23,7 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.catalog.Schema; import org.apache.tajo.plan.PlanString; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.Target; public class TableSubQueryNode extends RelationNode implements Projectable { http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java index 167da5b..f7fd90d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java @@ -31,7 +31,7 @@ import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.RewriteRule; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.util.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.TUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 5fe1515..6868b6c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -25,6 +25,7 @@ import org.apache.tajo.algebra.*; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes.DataType; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java deleted file mode 100644 index 78135ab..0000000 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.plan.util; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; - -public class SchemaUtil { - // See TAJO-914 bug. - // - // Its essential problem is that constant value is evaluated multiple times at each scan. - // As a result, join nodes can take the child nodes which have the same named fields. - // Because current schema does not allow the same name and ignore the duplicated schema, - // it finally causes the in-out schema mismatch between the parent and child nodes. - // - // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields. - // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895. - static int tmpColumnSeq = 0; - public static Schema merge(Schema left, Schema right) { - Schema merged = new Schema(); - for(Column col : left.getColumns()) { - if (!merged.containsByQualifiedName(col.getQualifiedName())) { - merged.addColumn(col); - } - } - for(Column col : right.getColumns()) { - if (merged.containsByQualifiedName(col.getQualifiedName())) { - merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType()); - } else { - merged.addColumn(col); - } - } - - // if overflow - if (tmpColumnSeq < 0) { - tmpColumnSeq = 0; - } - return merged; - } - - /** - * Get common columns to be used as join keys of natural joins. - */ - public static Schema getNaturalJoinColumns(Schema left, Schema right) { - Schema common = new Schema(); - for (Column outer : left.getColumns()) { - if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) { - common.addColumn(new Column(outer.getSimpleName(), outer.getDataType())); - } - } - - return common; - } - - public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) { - Schema logicalSchema = new Schema(tableDesc.getLogicalSchema()); - if (tableName != null) { - logicalSchema.setQualifier(tableName); - } - return logicalSchema; - } - - public static <T extends Schema> T clone(Schema schema) { - try { - T copy = (T) schema.clone(); - return copy; - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java new file mode 100644 index 0000000..b829f60 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.datum.Datum; + +import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto; +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +/** + * The Comparator class for Tuples + * + * @see Tuple + */ +public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> { + private final Schema schema; + private final SortSpec [] sortSpecs; + private final int[] sortKeyIds; + private final boolean[] asc; + @SuppressWarnings("unused") + private final boolean[] nullFirsts; + + private Datum left; + private Datum right; + private int compVal; + + /** + * @param schema The schema of input tuples + * @param sortKeys The description of sort keys + */ + public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) { + Preconditions.checkArgument(sortKeys.length > 0, + "At least one sort key must be specified."); + + this.schema = schema; + this.sortSpecs = sortKeys; + this.sortKeyIds = new int[sortKeys.length]; + this.asc = new boolean[sortKeys.length]; + this.nullFirsts = new boolean[sortKeys.length]; + for (int i = 0; i < sortKeys.length; i++) { + if (sortKeys[i].getSortKey().hasQualifier()) { + this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + } else { + this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); + } + + this.asc[i] = sortKeys[i].isAscending(); + this.nullFirsts[i]= sortKeys[i].isNullFirst(); + } + } + + public BaseTupleComparator(TupleComparatorProto proto) { + this.schema = new Schema(proto.getSchema()); + + this.sortSpecs = new SortSpec[proto.getSortSpecsCount()]; + for (int i = 0; i < proto.getSortSpecsCount(); i++) { + sortSpecs[i] = new SortSpec(proto.getSortSpecs(i)); + } + + this.sortKeyIds = new int[proto.getCompSpecsCount()]; + this.asc = new boolean[proto.getCompSpecsCount()]; + this.nullFirsts = new boolean[proto.getCompSpecsCount()]; + + for (int i = 0; i < proto.getCompSpecsCount(); i++) { + TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i); + sortKeyIds[i] = sortSepcProto.getColumnId(); + asc[i] = sortSepcProto.getAscending(); + nullFirsts[i] = sortSepcProto.getNullFirst(); + } + } + + public Schema getSchema() { + return schema; + } + + public SortSpec [] getSortSpecs() { + return sortSpecs; + } + + public int [] getSortKeyIds() { + return sortKeyIds; + } + + @Override + public boolean isAscendingFirstKey() { + return this.asc[0]; + } + + @Override + public int compare(Tuple tuple1, Tuple tuple2) { + for (int i = 0; i < sortKeyIds.length; i++) { + left = tuple1.get(sortKeyIds[i]); + right = tuple2.get(sortKeyIds[i]); + + if (left.isNull() || right.isNull()) { + if (!left.equals(right)) { + if (left.isNull()) { + compVal = 1; + } else if (right.isNull()) { + compVal = -1; + } + if (nullFirsts[i]) { + if (compVal != 0) { + compVal *= -1; + } + } + } else { + compVal = 0; + } + } else { + if (asc[i]) { + compVal = left.compareTo(right); + } else { + compVal = right.compareTo(left); + } + } + + if (compVal < 0 || compVal > 0) { + return compVal; + } + } + return 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(sortKeyIds); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BaseTupleComparator) { + BaseTupleComparator other = (BaseTupleComparator) obj; + if (sortKeyIds.length != other.sortKeyIds.length) { + return false; + } + + for (int i = 0; i < sortKeyIds.length; i++) { + if (sortKeyIds[i] != other.sortKeyIds[i] || + asc[i] != other.asc[i] || + nullFirsts[i] != other.nullFirsts[i]) { + return false; + } + } + + return true; + } else { + return false; + } + } + + @Override + public TupleComparatorProto getProto() { + TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder(); + builder.setSchema(schema.getProto()); + for (int i = 0; i < sortSpecs.length; i++) { + builder.addSortSpecs(sortSpecs[i].getProto()); + } + + TupleComparatorSpecProto.Builder sortSpecBuilder; + for (int i = 0; i < sortKeyIds.length; i++) { + sortSpecBuilder = TupleComparatorSpecProto.newBuilder(); + sortSpecBuilder.setColumnId(sortKeyIds[i]); + sortSpecBuilder.setAscending(asc[i]); + sortSpecBuilder.setNullFirst(nullFirsts[i]); + builder.addCompSpecs(sortSpecBuilder); + } + + return builder.build(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + String prefix = ""; + for (int i = 0; i < sortKeyIds.length; i++) { + sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) + .append(",Asc=").append(asc[i]) + .append(",NullFirst=").append(nullFirsts[i]); + prefix = " ,"; + } + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java index e0f8a2e..8b7e2e0 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -23,7 +23,7 @@ package org.apache.tajo.storage; import com.google.common.base.Preconditions; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; @@ -71,7 +71,12 @@ public class FrameTuple implements Tuple, Cloneable { @Override public boolean isNull(int fieldid) { - return get(fieldid) instanceof NullDatum; + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -177,6 +182,11 @@ public class FrameTuple implements Tuple, Cloneable { } @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override public char [] getUnicodeChars(int fieldId) { return get(fieldId).asUnicodeChars(); }
