TAJO-1460 Apply TAJO-1407 to ExternalSortExec Closes #474
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6e7f1c7b Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6e7f1c7b Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6e7f1c7b Branch: refs/heads/index_support Commit: 6e7f1c7b0561c7299a498b81dfb6148883a76a0d Parents: 7c2a240 Author: babokim <[email protected]> Authored: Thu Apr 2 15:22:37 2015 +0900 Committer: babokim <[email protected]> Committed: Thu Apr 2 15:22:37 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../planner/physical/ComparableVector.java | 411 +++++++++++++++++++ .../planner/physical/ExternalSortExec.java | 238 ++++------- .../engine/planner/physical/MemSortExec.java | 19 +- .../tajo/engine/planner/physical/SortExec.java | 10 +- .../engine/planner/physical/TupleSorter.java | 24 +- .../planner/physical/VectorizedSorter.java | 155 +------ .../planner/physical/TestExternalSortExec.java | 4 +- .../planner/physical/TestTupleSorter.java | 2 +- .../testCrossJoinWithAsterisk1.sql | 2 +- .../testCrossJoinWithAsterisk2.sql | 2 +- .../testCrossJoinWithAsterisk3.sql | 2 +- .../testCrossJoinWithAsterisk4.sql | 2 +- .../TestJoinQuery/testComplexJoinCondition7.sql | 2 +- .../testCrossJoinWithAsterisk1.sql | 2 +- .../testCrossJoinWithAsterisk2.sql | 2 +- .../testCrossJoinWithAsterisk3.sql | 2 +- .../testCrossJoinWithAsterisk4.sql | 2 +- .../queries/TestJoinQuery/testJoinWithJson.json | 8 + .../testComplexJoinCondition7.result | 4 +- .../apache/tajo/storage/AbstractScanner.java | 80 ++++ .../org/apache/tajo/storage/MemoryUtil.java | 4 + 22 files changed, 655 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c3bce7d..1ccd016 100644 --- a/CHANGES +++ b/CHANGES @@ -10,6 +10,9 @@ Release 0.11.0 - unreleased (jihun) IMPROVEMENT + + TAJO-1460: Apply TAJO-1407 to ExternalSortExec. (Contributed by navis, + Committed by hyoungjun) TAJO-1350: Refactor FilterPushDownRule::visitJoin() into well-defined, small methods. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java new file mode 100644 index 0000000..39b8c8a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java @@ -0,0 +1,411 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.primitives.Booleans; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; +import com.google.common.primitives.UnsignedInts; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; + +import java.util.Arrays; +import java.util.BitSet; + +/** + * Extract raw level values (primitive or String/byte[]) from each of key columns for compare + */ +public class ComparableVector { + + protected final Tuple[] tuples; // source tuples + protected final TupleVector[] vectors; // values of key columns + protected final int[] keyIndex; + + public ComparableVector(int length, SortSpec[] sortKeys, int[] keyIndex) { + tuples = new Tuple[length]; + vectors = new TupleVector[sortKeys.length]; + for (int i = 0; i < vectors.length; i++) { + TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType(); + boolean nullFirst = sortKeys[i].isNullFirst(); + boolean ascending = sortKeys[i].isAscending(); + boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending; + vectors[i] = new TupleVector(vectorType(type), tuples.length, nullInvert, ascending); + } + this.keyIndex = keyIndex; + } + + public int compare(final int i1, final int i2) { + for (TupleVector vector : vectors) { + int compare = vector.compare(i1, i2); + if (compare != 0) { + return compare; + } + } + return 0; + } + + public void set(int index, Tuple tuple) { + for (int i = 0; i < vectors.length; i++) { + vectors[i].set(index, tuple, keyIndex[i]); + } + } + + protected static class TupleVector { + + private final int type; + private final BitSet nulls; + private final boolean nullInvert; + private final boolean ascending; + + private boolean[] booleans; + private byte[] bits; + private short[] shorts; + private int[] ints; + private long[] longs; + private float[] floats; + private double[] doubles; + private byte[][] bytes; + + private int index; + + private TupleVector(int type, int length, boolean nullInvert, boolean ascending) { + this.type = type; + this.nulls = new BitSet(length); + this.nullInvert = nullInvert; + this.ascending = ascending; + switch (type) { + case 0: booleans = new boolean[length]; break; + case 1: bits = new byte[length]; break; + case 2: shorts = new short[length]; break; + case 3: ints = new int[length]; break; + case 4: longs = new long[length]; break; + case 5: floats = new float[length]; break; + case 6: doubles = new double[length]; break; + case 7: bytes = new byte[length][]; break; + case 8: ints = new int[length]; break; + case -1: break; + default: + throw new IllegalArgumentException(); + } + } + + protected final void append(Tuple tuple, int field) { + set(index++, tuple, field); + } + + protected final void set(int index, Tuple tuple, int field) { + if (tuple.isNull(field)) { + nulls.set(index); + return; + } + nulls.clear(index); + switch (type) { + case 0: booleans[index] = tuple.getBool(field); break; + case 1: bits[index] = tuple.getByte(field); break; + case 2: shorts[index] = tuple.getInt2(field); break; + case 3: ints[index] = tuple.getInt4(field); break; + case 4: longs[index] = tuple.getInt8(field); break; + case 5: floats[index] = tuple.getFloat4(field); break; + case 6: doubles[index] = tuple.getFloat8(field); break; + case 7: bytes[index] = tuple.getBytes(field); break; + case 8: ints[index] = tuple.getInt4(field); break; + default: + throw new IllegalArgumentException(); + } + } + + protected final int compare(int index1, int index2) { + final boolean n1 = nulls.get(index1); + final boolean n2 = nulls.get(index2); + if (n1 && n2) { + return 0; + } + if (n1 ^ n2) { + int compVal = n1 ? 1 : -1; + return nullInvert ? -compVal : compVal; + } + int compare; + switch (type) { + case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break; + case 1: compare = bits[index1] - bits[index2]; break; + case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break; + case 3: compare = Ints.compare(ints[index1], ints[index2]); break; + case 4: compare = Longs.compare(longs[index1], longs[index2]); break; + case 5: compare = Floats.compare(floats[index1], floats[index2]); break; + case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break; + case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break; + case 8: compare = UnsignedInts.compare(ints[index1], ints[index2]); break; + default: + throw new IllegalArgumentException(); + } + return ascending ? compare : -compare; + } + } + + public static class ComparableTuple { + + private final TupleType[] keyTypes; + private final int[] keyIndex; + private final Object[] keys; + + public ComparableTuple(Schema schema, int[] keyIndex) { + this(tupleTypes(schema, keyIndex), keyIndex); + } + + public ComparableTuple(Schema schema, int start, int end) { + this(schema, toKeyIndex(start, end)); + } + + private ComparableTuple(TupleType[] keyTypes, int[] keyIndex) { + this.keyTypes = keyTypes; + this.keyIndex = keyIndex; + this.keys = new Object[keyIndex.length]; + } + + public int size() { + return keyIndex.length; + } + + public void set(Tuple tuple) { + for (int i = 0; i < keyTypes.length; i++) { + final int field = keyIndex[i]; + if (tuple.isNull(field)) { + keys[i] = null; + continue; + } + switch (keyTypes[i]) { + case BOOLEAN: keys[i] = tuple.getBool(field); break; + case BIT: keys[i] = tuple.getByte(field); break; + case INT1: + case INT2: keys[i] = tuple.getInt2(field); break; + case INT4: + case DATE: + case INET4: keys[i] = tuple.getInt4(field); break; + case INT8: + case TIME: + case TIMESTAMP: keys[i] = tuple.getInt8(field); break; + case FLOAT4: keys[i] = tuple.getFloat4(field); break; + case FLOAT8: keys[i] = tuple.getFloat8(field); break; + case TEXT: + case CHAR: + case BLOB: keys[i] = tuple.getBytes(field); break; + case DATUM: keys[i] = tuple.get(field); break; + default: + throw new IllegalArgumentException(); + } + } + } + + @Override + public boolean equals(Object obj) { + ComparableTuple other = (ComparableTuple)obj; + for (int i = 0; i < keys.length; i++) { + final boolean n1 = keys[i] == null; + final boolean n2 = other.keys[i] == null; + if (n1 && n2) { + continue; + } + if (n1 ^ n2) { + return false; + } + switch (keyTypes[i]) { + case TEXT: + case CHAR: + case BLOB: if (!Arrays.equals((byte[])keys[i], (byte[])other.keys[i])) return false; continue; + default: if (!keys[i].equals(other.keys[i])) return false; continue; + } + } + return true; + } + + public boolean equals(Tuple tuple) { + for (int i = 0; i < keys.length; i++) { + final int field = keyIndex[i]; + final boolean n1 = keys[i] == null; + final boolean n2 = tuple.isNull(field); + if (n1 && n2) { + continue; + } + if (n1 ^ n2) { + return false; + } + switch (keyTypes[i]) { + case BOOLEAN: if ((Boolean)keys[i] != tuple.getBool(field)) return false; continue; + case BIT: if ((Byte)keys[i] != tuple.getByte(field)) return false; continue; + case INT1: + case INT2: if ((Short)keys[i] != tuple.getInt2(field)) return false; continue; + case INT4: + case DATE: + case INET4: if ((Integer)keys[i] != tuple.getInt4(field)) return false; continue; + case INT8: + case TIME: + case TIMESTAMP: if ((Long)keys[i] != tuple.getInt8(field)) return false; continue; + case FLOAT4: if ((Float)keys[i] != tuple.getFloat4(field)) return false; continue; + case FLOAT8: if ((Double)keys[i] != tuple.getFloat8(field)) return false; continue; + case TEXT: + case CHAR: + case BLOB: if (!Arrays.equals((byte[])keys[i], tuple.getBytes(field))) return false; continue; + case DATUM: if (!keys[i].equals(tuple.get(field))) return false; continue; + } + } + return true; + } + + @Override + public int hashCode() { + int result = 1; + for (Object key : keys) { + int hash = key == null ? 0 : + key instanceof byte[] ? Arrays.hashCode((byte[])key) : key.hashCode(); + result = 31 * result + hash; + } + return result; + } + + public ComparableTuple copy() { + ComparableTuple copy = emptyCopy(); + System.arraycopy(keys, 0, copy.keys, 0, keys.length); + return copy; + } + + public ComparableTuple emptyCopy() { + return new ComparableTuple(keyTypes, keyIndex); + } + + public VTuple toVTuple() { + VTuple vtuple = new VTuple(keyIndex.length); + for (int i = 0; i < keyIndex.length; i++) { + vtuple.put(i, toDatum(i)); + } + return vtuple; + } + + public Datum toDatum(int i) { + if (keys[i] == null) { + return NullDatum.get(); + } + switch (keyTypes[i]) { + case NULL_TYPE: return NullDatum.get(); + case BOOLEAN: return DatumFactory.createBool((Boolean) keys[i]); + case BIT: return DatumFactory.createBit((Byte)keys[i]); + case INT1: + case INT2: return DatumFactory.createInt2((Short) keys[i]); + case INT4: return DatumFactory.createInt4((Integer) keys[i]); + case DATE: return DatumFactory.createDate((Integer) keys[i]); + case INET4: return DatumFactory.createInet4((Integer) keys[i]); + case INT8: return DatumFactory.createInt8((Long) keys[i]); + case TIME: return DatumFactory.createTime((Long) keys[i]); + case TIMESTAMP: return DatumFactory.createTimestamp((Long) keys[i]); + case FLOAT4: return DatumFactory.createFloat4((Float) keys[i]); + case FLOAT8: return DatumFactory.createFloat8((Double) keys[i]); + case TEXT: return DatumFactory.createText((byte[]) keys[i]); + case CHAR: return DatumFactory.createChar((byte[]) keys[i]); + case BLOB: return DatumFactory.createBlob((byte[]) keys[i]); + case DATUM: return (Datum)keys[i]; + default: + throw new IllegalArgumentException(); + } + } + } + + public static boolean isVectorizable(SortSpec[] sortKeys) { + if (sortKeys.length == 0) { + return false; + } + for (SortSpec spec : sortKeys) { + try { + vectorType(spec.getSortKey().getDataType().getType()); + } catch (Exception e) { + return false; + } + } + return true; + } + + private static int vectorType(TajoDataTypes.Type type) { + switch (type) { + case BOOLEAN: return 0; + case BIT: return 1; + case INT1: case INT2: return 2; + case INT4: case DATE: return 3; + case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4; + case FLOAT4: return 5; + case FLOAT8: return 6; + case TEXT: case CHAR: case BLOB: return 7; + case INET4: return 8; + case NULL_TYPE: return -1; + } + // todo + throw new UnsupportedException(type.name()); + } + + private static TupleType[] tupleTypes(Schema schema, int[] keyIndex) { + TupleType[] types = new TupleType[keyIndex.length]; + for (int i = 0; i < keyIndex.length; i++) { + types[i] = tupleType(schema.getColumn(keyIndex[i]).getDataType().getType()); + } + return types; + } + + private static TupleType tupleType(TajoDataTypes.Type type) { + switch (type) { + case BOOLEAN: return TupleType.BOOLEAN; + case BIT: return TupleType.BIT; + case INT1: return TupleType.INT1; + case INT2: return TupleType.INT2; + case INT4: return TupleType.INT4; + case DATE: return TupleType.DATE; + case INT8: return TupleType.INT8; + case TIME: return TupleType.TIME; + case TIMESTAMP: return TupleType.TIMESTAMP; + case FLOAT4: return TupleType.FLOAT4; + case FLOAT8: return TupleType.FLOAT8; + case TEXT: return TupleType.TEXT; + case CHAR: return TupleType.CHAR; + case BLOB: return TupleType.BLOB; + case INET4: return TupleType.INET4; + case NULL_TYPE: return TupleType.NULL_TYPE; + default: return TupleType.DATUM; + } + } + + private static int[] toKeyIndex(int start, int end) { + int[] keyIndex = new int[end - start]; + for (int i = 0; i < keyIndex.length; i++) { + keyIndex[i] = start + i; + } + return keyIndex; + } + + private static enum TupleType { + NULL_TYPE, BOOLEAN, BIT, INT1, INT2, INT4, DATE, INET4, INT8, TIME, TIMESTAMP, + FLOAT4, FLOAT8, TEXT, CHAR, BLOB, DATUM + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index b3ebfb2..355f015 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -168,14 +167,14 @@ public class ExternalSortExec extends SortExec { int rowNum = tupleBlock.size(); long sortStart = System.currentTimeMillis(); - Collections.sort(tupleBlock, getComparator()); + Iterable<Tuple> sorted = getSorter(tupleBlock).sort(); long sortEnd = System.currentTimeMillis(); long chunkWriteStart = System.currentTimeMillis(); Path outputPath = getChunkPathForWrite(0, chunkId); final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); appender.init(); - for (Tuple t : tupleBlock) { + for (Tuple t : sorted) { appender.addTuple(t); } appender.close(); @@ -236,18 +235,13 @@ public class ExternalSortExec extends SortExec { } } - if (inMemoryTable.size() > 0) { // if there are at least one or more input tuples - if (!memoryResident) { // check if data exceeds a sort buffer. If so, it store the remain data into a chunk. - if (inMemoryTable.size() > 0) { - long start = System.currentTimeMillis(); - int rowNum = inMemoryTable.size(); - chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable)); - long end = System.currentTimeMillis(); - info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)"); - } - } else { // this case means that all data does not exceed a sort buffer - Collections.sort(inMemoryTable, getComparator()); - } + if (!memoryResident && !inMemoryTable.isEmpty()) { // if there are at least one or more input tuples + // check if data exceeds a sort buffer. If so, it store the remain data into a chunk. + long start = System.currentTimeMillis(); + int rowNum = inMemoryTable.size(); + chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable)); + long end = System.currentTimeMillis(); + info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)"); } // get total loaded (or stored) bytes and total row numbers @@ -285,7 +279,8 @@ public class ExternalSortExec extends SortExec { info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec"); if (memoryResident) { // if all sorted data reside in a main-memory table. - this.result = new MemTableScanner(); + TupleSorter sorter = getSorter(inMemoryTable); + result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), sortAndStoredBytes); } else { // if input data exceeds main-memory at least once try { @@ -314,7 +309,7 @@ public class ExternalSortExec extends SortExec { return result.next(); } - private int calculateFanout(int remainInputChunks, int intputNum, int outputNum, int startIdx) { + private int calculateFanout(int remainInputChunks, int inputNum, int outputNum, int startIdx) { int computedFanout = Math.min(remainInputChunks, defaultFanout); // Why should we detect an opportunity for unbalanced merge? @@ -322,9 +317,9 @@ public class ExternalSortExec extends SortExec { // Assume that a fanout is given by 8 and there are 10 chunks. // If we firstly merge 3 chunks into one chunk, there remain only 8 chunks. // Then, we can just finish the merge phase even though we don't complete merge phase on all chunks. - if (checkIfCanBeUnbalancedMerged(intputNum - (startIdx + computedFanout), outputNum + 1)) { + if (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + computedFanout), outputNum + 1)) { int candidateFanout = computedFanout; - while(checkIfCanBeUnbalancedMerged(intputNum - (startIdx + candidateFanout), outputNum + 1)) { + while (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + candidateFanout), outputNum + 1)) { candidateFanout--; } int beforeFanout = computedFanout; @@ -354,7 +349,7 @@ public class ExternalSortExec extends SortExec { int remainInputRuns = inputFiles.size(); int outChunkId = 0; int outputFileNum = 0; - List<Future> futures = TUtil.newList(); + List<Future<FileFragment>> futures = TUtil.newList(); // the number of files being merged in threads. List<Integer> numberOfMergingFiles = TUtil.newList(); @@ -419,7 +414,7 @@ public class ExternalSortExec extends SortExec { */ int numDeletedFiles = 0; for (FileFragment frag : inputFiles) { - if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX) == true) { + if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { localFS.delete(frag.getPath(), true); numDeletedFiles++; LOG.info("Delete merged intermediate file: " + frag); @@ -527,28 +522,38 @@ public class ExternalSortExec extends SortExec { throws IOException { if (num > 1) { final int mid = (int) Math.ceil((float)num / 2); - return new PairWiseMerger(inSchema, - createKWayMergerInternal(sources, startIdx, mid), - createKWayMergerInternal(sources, startIdx + mid, num - mid), getComparator()); + Scanner left = createKWayMergerInternal(sources, startIdx, mid); + Scanner right = createKWayMergerInternal(sources, startIdx + mid, num - mid); + if (ComparableVector.isVectorizable(sortSpecs)) { + return new VectorComparePairWiseMerger(inSchema, left, right, comparator); + } + return new PairWiseMerger(inSchema, left, right, comparator); } else { return sources[startIdx]; } } - private class MemTableScanner implements Scanner { - Iterator<Tuple> iterator; + private static class MemTableScanner extends AbstractScanner { + final Iterable<Tuple> iterable; + final long sortAndStoredBytes; + final int totalRecords; + Iterator<Tuple> iterator; // for input stats float scannerProgress; int numRecords; - int totalRecords; TableStats scannerTableStats; + public MemTableScanner(Iterable<Tuple> iterable, int length, long inBytes) { + this.iterable = iterable; + this.totalRecords = length; + this.sortAndStoredBytes = inBytes; + } + @Override public void init() throws IOException { - iterator = inMemoryTable.iterator(); + iterator = iterable.iterator(); - totalRecords = inMemoryTable.size(); scannerProgress = 0.0f; numRecords = 0; @@ -581,34 +586,6 @@ public class ExternalSortExec extends SortExec { } @Override - public boolean isProjectable() { - return false; - } - - @Override - public void setTarget(Column[] targets) { - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public boolean isSplittable() { - return false; - } - - @Override - public Schema getSchema() { - return null; - } - - @Override public float getProgress() { if (iterator != null && numRecords > 0) { return (float)numRecords / (float)totalRecords; @@ -630,19 +607,43 @@ public class ExternalSortExec extends SortExec { CLOSED } + private static class VectorComparePairWiseMerger extends PairWiseMerger { + + private ComparableVector comparable; + + public VectorComparePairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner, + BaseTupleComparator comparator) throws IOException { + super(schema, leftScanner, rightScanner, null); + comparable = new ComparableVector(2, comparator.getSortSpecs(), comparator.getSortKeyIds()); + } + + @Override + protected Tuple prepare(int index, Tuple tuple) { + if (tuple != null) { + comparable.set(index, tuple); + } + return tuple; + } + + @Override + protected int compare() { + return comparable.compare(0, 1); + } + } + /** * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order. */ - private static class PairWiseMerger implements Scanner { - private Scanner leftScan; - private Scanner rightScan; + private static class PairWiseMerger extends AbstractScanner { - private VTuple outTuple; - private VTuple leftTuple; - private VTuple rightTuple; + protected final Schema schema; + protected final Comparator<Tuple> comparator; - private final Schema schema; - private final Comparator<Tuple> comparator; + protected final Scanner leftScan; + protected final Scanner rightScan; + + private Tuple leftTuple; + private Tuple rightTuple; private float mergerProgress; private TableStats mergerInputStats; @@ -679,74 +680,30 @@ public class ExternalSortExec extends SortExec { } private void prepareTuplesForFirstComparison() throws IOException { - Tuple lt = leftScan.next(); - if (lt != null) { - leftTuple = new VTuple(lt); - } else { - leftTuple = null; // TODO - missed free - } - - Tuple rt = rightScan.next(); - if (rt != null) { - rightTuple = new VTuple(rt); - } else { - rightTuple = null; // TODO - missed free - } + leftTuple = prepare(0, leftScan.next()); + rightTuple = prepare(1, rightScan.next()); } - public Tuple next() throws IOException { + protected Tuple prepare(int index, Tuple tuple) { + return tuple == null ? null : new VTuple(tuple); + } - if (leftTuple != null && rightTuple != null) { - if (comparator.compare(leftTuple, rightTuple) < 0) { - outTuple = new VTuple(leftTuple); + protected int compare() { + return comparator.compare(leftTuple, rightTuple); + } - Tuple lt = leftScan.next(); - if (lt != null) { - leftTuple = new VTuple(lt); - } else { - leftTuple = null; // TODO - missed free - } - } else { - outTuple = new VTuple(rightTuple); - - Tuple rt = rightScan.next(); - if (rt != null) { - rightTuple = new VTuple(rt); - } else { - rightTuple = null; // TODO - missed free - } - } - return outTuple; + public Tuple next() throws IOException { + if (leftTuple == null && rightTuple == null) { + return null; } - - if (leftTuple == null) { - if (rightTuple != null) { - outTuple = new VTuple(rightTuple); - } else { - outTuple = null; - } - - Tuple rt = rightScan.next(); - if (rt != null) { - rightTuple = new VTuple(rt); - } else { - rightTuple = null; // TODO - missed free - } - } else { - if (leftTuple != null) { - outTuple = new VTuple(leftTuple); - } else { - outTuple = null; - } - - Tuple lt = leftScan.next(); - if (lt != null) { - leftTuple = new VTuple(lt); - } else { - leftTuple = null; // TODO - missed free - } + if (rightTuple == null || (leftTuple != null && compare() < 0)) { + Tuple tuple = leftTuple; + leftTuple = prepare(0, leftScan.next()); + return tuple; } - return outTuple; + Tuple tuple = rightTuple; + rightTuple = prepare(1, rightScan.next()); + return tuple; } @Override @@ -755,7 +712,6 @@ public class ExternalSortExec extends SortExec { leftScan.reset(); rightScan.reset(); - outTuple = null; leftTuple = null; rightTuple = null; @@ -765,39 +721,15 @@ public class ExternalSortExec extends SortExec { } } + @Override public void close() throws IOException { IOUtils.cleanup(LOG, leftScan, rightScan); getInputStats(); - leftScan = null; - rightScan = null; mergerProgress = 1.0f; setState(State.CLOSED); } @Override - public boolean isProjectable() { - return false; - } - - @Override - public void setTarget(Column[] targets) { - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public boolean isSplittable() { - return false; - } - - @Override public Schema getSchema() { return schema; } http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java index a2e039c..f76e356 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java @@ -25,11 +25,10 @@ import org.apache.tajo.storage.VTuple; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; -public class MemSortExec extends SortExec implements TupleSorter { +public class MemSortExec extends SortExec { private SortNode plan; private List<Tuple> tupleSlots; private boolean sorted = false; @@ -54,7 +53,7 @@ public class MemSortExec extends SortExec implements TupleSorter { while (!context.isStopped() && (tuple = child.next()) != null) { tupleSlots.add(new VTuple(tuple)); } - iterator = getSorter().sort(); + iterator = getSorter(tupleSlots).sort().iterator(); sorted = true; } @@ -65,14 +64,6 @@ public class MemSortExec extends SortExec implements TupleSorter { } } - private TupleSorter getSorter() { - try { - return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds()); - } catch (Exception e) { - return this; - } - } - @Override public void rescan() throws IOException { super.rescan(); @@ -92,10 +83,4 @@ public class MemSortExec extends SortExec implements TupleSorter { public SortNode getPlan() { return this.plan; } - - @Override - public Iterator<Tuple> sort() { - Collections.sort(tupleSlots, comparator); - return tupleSlots.iterator(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java index fb6a3b2..28be9de 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java @@ -22,13 +22,14 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.Comparator; +import java.util.List; public abstract class SortExec extends UnaryPhysicalExec { + protected final BaseTupleComparator comparator; protected final SortSpec [] sortSpecs; @@ -39,6 +40,13 @@ public abstract class SortExec extends UnaryPhysicalExec { this.comparator = new BaseTupleComparator(inSchema, sortSpecs); } + protected TupleSorter getSorter(List<Tuple> tupleSlots) { + if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) { + return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds()); + } + return new TupleSorter.DefaultSorter(tupleSlots, comparator); + } + public SortSpec[] getSortSpecs() { return sortSpecs; } http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java index d240e4a..57fe816 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java @@ -19,9 +19,29 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.TupleComparator; -import java.util.Iterator; +import java.util.Collections; +import java.util.List; public interface TupleSorter { - Iterator<Tuple> sort(); + + Iterable<Tuple> sort(); + + public static class DefaultSorter implements TupleSorter { + + private final List<Tuple> target; + private final TupleComparator comparator; + + public DefaultSorter(List<Tuple> target, TupleComparator comparator) { + this.target = target; + this.comparator = comparator; + } + + @Override + public Iterable<Tuple> sort() { + Collections.sort(target, comparator); + return target; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java index 891d104..18d853f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java @@ -18,22 +18,12 @@ package org.apache.tajo.engine.planner.physical; -import com.google.common.primitives.Booleans; -import com.google.common.primitives.Doubles; -import com.google.common.primitives.Floats; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; -import com.google.common.primitives.Shorts; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.TextDatum; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; -import java.util.BitSet; import java.util.Iterator; import java.util.List; @@ -41,26 +31,17 @@ import java.util.List; * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting * Uses indirection for efficient swapping */ -public class VectorizedSorter implements IndexedSortable, TupleSorter { +public class VectorizedSorter extends ComparableVector implements IndexedSortable, TupleSorter { - private final Tuple[] tuples; // source tuples - private final TupleVector[] vectors; // values of key columns private final int[] mappings; // index indirection public VectorizedSorter(List<Tuple> source, SortSpec[] sortKeys, int[] keyIndex) { - this.tuples = source.toArray(new Tuple[source.size()]); - vectors = new TupleVector[sortKeys.length]; + super(source.size(), sortKeys, keyIndex); + source.toArray(tuples); // wish it's array list mappings = new int[tuples.length]; - for (int i = 0; i < vectors.length; i++) { - TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType(); - boolean nullFirst = sortKeys[i].isNullFirst(); - boolean ascending = sortKeys[i].isAscending(); - boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending; - vectors[i] = new TupleVector(TupleVector.getType(type), tuples.length, nullInvert, ascending); - } for (int i = 0; i < tuples.length; i++) { for (int j = 0; j < keyIndex.length; j++) { - vectors[j].add(tuples[i].get(keyIndex[j])); + vectors[j].append(tuples[i], keyIndex[j]); } mappings[i] = i; } @@ -68,15 +49,7 @@ public class VectorizedSorter implements IndexedSortable, TupleSorter { @Override public int compare(int i1, int i2) { - final int index1 = mappings[i1]; - final int index2 = mappings[i2]; - for (TupleVector vector : vectors) { - int compare = vector.compare(index1, index2); - if (compare != 0) { - return compare; - } - } - return 0; + return super.compare(mappings[i1], mappings[i2]); } @Override @@ -87,112 +60,18 @@ public class VectorizedSorter implements IndexedSortable, TupleSorter { } @Override - public Iterator<Tuple> sort() { - new QuickSort().sort(VectorizedSorter.this, 0, mappings.length); - return new Iterator<Tuple>() { - int index; - public boolean hasNext() { return index < mappings.length; } - public Tuple next() { return tuples[mappings[index++]]; } - public void remove() { throw new UnsupportedException(); } - }; - } - - private static class TupleVector { - - private final int type; - private final BitSet nulls; - private final boolean nullInvert; - private final boolean ascending; - - private boolean[] booleans; - private byte[] bits; - private short[] shorts; - private int[] ints; - private long[] longs; - private float[] floats; - private double[] doubles; - private byte[][] bytes; - - private int index; - - private TupleVector(int type, int length, boolean nullInvert, boolean ascending) { - this.type = type; - this.nulls = new BitSet(length); - this.nullInvert = nullInvert; - this.ascending = ascending; - switch (type) { - case 0: booleans = new boolean[length]; break; - case 1: bits = new byte[length]; break; - case 2: shorts = new short[length]; break; - case 3: ints = new int[length]; break; - case 4: longs = new long[length]; break; - case 5: floats = new float[length]; break; - case 6: doubles = new double[length]; break; - case 7: bytes = new byte[length][]; break; - default: - throw new IllegalArgumentException(); - } - } - - private void add(Datum datum) { - if (datum.isNull()) { - nulls.set(index++); - return; - } - switch (type) { - case 0: booleans[index] = datum.asBool(); break; - case 1: bits[index] = datum.asByte(); break; - case 2: shorts[index] = datum.asInt2(); break; - case 3: ints[index] = datum.asInt4(); break; - case 4: longs[index] = datum.asInt8(); break; - case 5: floats[index] = datum.asFloat4(); break; - case 6: doubles[index] = datum.asFloat8(); break; - case 7: bytes[index] = datum.asByteArray(); break; - default: - throw new IllegalArgumentException(); - } - index++; - } - - private int compare(int index1, int index2) { - final boolean n1 = nulls.get(index1); - final boolean n2 = nulls.get(index2); - if (n1 && n2) { - return 0; - } - if (n1 ^ n2) { - int compVal = n1 ? 1 : -1; - return nullInvert ? -compVal : compVal; + public Iterable<Tuple> sort() { + new QuickSort().sort(this, 0, mappings.length); + return new Iterable<Tuple>() { + @Override + public Iterator<Tuple> iterator() { + return new Iterator<Tuple>() { + int index; + public boolean hasNext() { return index < mappings.length; } + public Tuple next() { return tuples[mappings[index++]]; } + public void remove() { throw new UnsupportedException(); } + }; } - int compare; - switch (type) { - case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break; - case 1: compare = bits[index1] - bits[index2]; break; - case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break; - case 3: compare = Ints.compare(ints[index1], ints[index2]); break; - case 4: compare = Longs.compare(longs[index1], longs[index2]); break; - case 5: compare = Floats.compare(floats[index1], floats[index2]); break; - case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break; - case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break; - default: - throw new IllegalArgumentException(); - } - return ascending ? compare : -compare; - } - - public static int getType(TajoDataTypes.Type type) { - switch (type) { - case BOOLEAN: return 0; - case BIT: case INT1: return 1; - case INT2: return 2; - case INT4: case DATE: case INET4: return 3; - case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4; - case FLOAT4: return 5; - case FLOAT8: return 6; - case TEXT: case CHAR: case BLOB: return 7; - } - // todo - throw new UnsupportedException(type.name()); - } + }; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index 5d9d46d..946e0f3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -61,7 +61,7 @@ public class TestExternalSortExec { private LogicalPlanner planner; private Path testDir; - private final int numTuple = 100000; + private final int numTuple = 3000000; private Random rnd = new Random(System.currentTimeMillis()); private TableDesc employee; @@ -161,7 +161,7 @@ public class TestExternalSortExec { if (preVal != null) { assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); } - preVal = curVal; + preVal = new VTuple(curVal); cnt++; } long end = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java index fc43d42..9cc477a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java @@ -93,7 +93,7 @@ public class TestTupleSorter { long start = System.currentTimeMillis(); VectorizedSorter sorter = new VectorizedSorter(target, sortKeys, keyIndices); - Iterator<Tuple> iterator = sorter.sort(); + Iterator<Tuple> iterator = sorter.sort().iterator(); String[] result1 = new String[SAMPLING]; for (int i = 0; i < result1.length; i++) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql index 5451b4a..111a371 100644 --- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql @@ -1 +1 @@ -select region.*, customer.* from region, customer order by r_regionkey,r_name; \ No newline at end of file +select region.*, customer.* from region, customer order by r_regionkey,r_name,c_custkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql index e9dac51..ca1672e 100644 --- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql @@ -1 +1 @@ -select region.*, customer.* from customer, region order by r_regionkey,r_name; \ No newline at end of file +select region.*, customer.* from customer, region order by r_regionkey,r_name,c_custkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql index c98e19f..fd44916 100644 --- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql @@ -1 +1 @@ -select * from customer, region order by c_custkey,c_name; \ No newline at end of file +select * from customer, region order by c_custkey,c_name,r_regionkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql index 7130def..fc5b1c3 100644 --- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql @@ -1 +1 @@ -select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name; \ No newline at end of file +select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name,c_custkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql index ddd669c..d2114cf 100644 --- a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql +++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql @@ -3,4 +3,4 @@ select n1.n_name, n2.n_name from nation n1 join (select * from nation union select * from nation) n2 on substr(n1.n_name, 1, 4) = substr(n2.n_name, 1, 4) -order by n1.n_nationkey; \ No newline at end of file +order by n1.n_nationkey,n2.n_name; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql index 5451b4a..111a371 100644 --- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql +++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql @@ -1 +1 @@ -select region.*, customer.* from region, customer order by r_regionkey,r_name; \ No newline at end of file +select region.*, customer.* from region, customer order by r_regionkey,r_name,c_custkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql index e9dac51..ca1672e 100644 --- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql +++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql @@ -1 +1 @@ -select region.*, customer.* from customer, region order by r_regionkey,r_name; \ No newline at end of file +select region.*, customer.* from customer, region order by r_regionkey,r_name,c_custkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql index c98e19f..fd44916 100644 --- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql +++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql @@ -1 +1 @@ -select * from customer, region order by c_custkey,c_name; \ No newline at end of file +select * from customer, region order by c_custkey,c_name,r_regionkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql index 7130def..fc5b1c3 100644 --- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql +++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql @@ -1 +1 @@ -select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name; \ No newline at end of file +select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name,c_custkey; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json index bfccc6c..f6b34cd 100644 --- a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json +++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json @@ -63,6 +63,14 @@ }, "IsAsc": true, "IsNullFirst": false + }, + { + "SortKey": { + "ColumnName": "c_custkey", + "OpType": "Column" + }, + "IsAsc": true, + "IsNullFirst": false } ], "Expr": { http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result index edd83cd..bed2968 100644 --- a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result +++ b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result @@ -47,10 +47,10 @@ n_nationkey,n_name,n_name 22,RUSSIA,RUSSIA 22,RUSSIA,RUSSIA 23,UNITED KINGDOM,UNITED KINGDOM -23,UNITED KINGDOM,UNITED STATES 23,UNITED KINGDOM,UNITED KINGDOM 23,UNITED KINGDOM,UNITED STATES +23,UNITED KINGDOM,UNITED STATES 24,UNITED STATES,UNITED KINGDOM -24,UNITED STATES,UNITED STATES 24,UNITED STATES,UNITED KINGDOM +24,UNITED STATES,UNITED STATES 24,UNITED STATES,UNITED STATES \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java new file mode 100644 index 0000000..3719412 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.statistics.TableStats; + +import java.io.IOException; + +// dummy scanner +public abstract class AbstractScanner implements Scanner { + + @Override + public void init() throws IOException { + + } + + @Override + public void reset() throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public void setTarget(Column[] targets) { + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return false; + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public TableStats getInputStats() { + return null; + } + + @Override + public Schema getSchema() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java index f19b61f..16477cd 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java @@ -141,6 +141,10 @@ public class MemoryUtil { total += TEXT_DATUM + datum.size(); break; + case BLOB: + total += BLOB_DATUM + datum.size(); + break; + case DATE: total += DATE_DATUM; break;
