TAJO-1271: Improve memory usage of Hash-shuffle.
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7734e06e Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7734e06e Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7734e06e Branch: refs/heads/branch-0.11.1 Commit: 7734e06e54dfce2740da754996dd2981fbee31bf Parents: 7fb0f25 Author: Jinho Kim <[email protected]> Authored: Tue Nov 17 12:24:21 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Tue Nov 17 12:24:21 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/TajoTestingCluster.java | 5 +- .../main/java/org/apache/tajo/SessionVars.java | 4 +- .../java/org/apache/tajo/conf/TajoConf.java | 10 +- .../org/apache/tajo/storage/BufferPool.java | 6 +- .../org/apache/tajo/tuple/BaseTupleBuilder.java | 2 +- .../tuple/memory/CompactRowBlockWriter.java | 408 +++++++++++++++ .../tajo/tuple/memory/DirectBufTuple.java | 6 +- .../tajo/tuple/memory/HeapRowBlockReader.java | 5 +- .../org/apache/tajo/tuple/memory/HeapTuple.java | 11 +- .../tajo/tuple/memory/MemoryRowBlock.java | 81 ++- .../tuple/memory/OffHeapRowBlockReader.java | 8 +- .../tajo/tuple/memory/OffHeapRowBlockUtils.java | 133 +++-- .../tajo/tuple/memory/ResizableLimitSpec.java | 11 +- .../tajo/tuple/memory/ResizableMemoryBlock.java | 18 +- .../org/apache/tajo/tuple/memory/RowBlock.java | 20 + .../apache/tajo/tuple/memory/UnSafeTuple.java | 24 +- .../apache/tajo/tuple/memory/ZeroCopyTuple.java | 10 +- .../java/org/apache/tajo/util/BitArray.java | 5 +- .../planner/physical/TestExternalSortExec.java | 2 +- .../planner/physical/TestHashJoinExec.java | 1 + .../planner/physical/TestPhysicalPlanner.java | 3 + .../physical/TestProgressExternalSortExec.java | 41 +- .../tajo/querymaster/TestTaskStatusUpdate.java | 2 +- .../TestTajoCli/testHelpSessionVars.result | 3 +- .../engine/planner/PhysicalPlannerImpl.java | 4 +- .../planner/physical/ExternalSortExec.java | 117 +++-- .../physical/HashShuffleFileWriteExec.java | 201 +++++--- .../tajo/engine/planner/physical/SortExec.java | 3 +- .../engine/planner/physical/TupleSorter.java | 5 +- .../planner/physical/VectorizedSorter.java | 3 +- .../apache/tajo/querymaster/Repartitioner.java | 5 +- .../java/org/apache/tajo/worker/TajoWorker.java | 4 + .../java/org/apache/tajo/worker/TaskImpl.java | 15 +- .../tajo/plan/function/stream/BufferPool.java | 7 +- .../storage/HashShuffleAppenderManager.java | 167 ++++--- .../storage/HashShuffleAppenderWrapper.java | 117 +++-- .../java/org/apache/tajo/storage/RawFile.java | 499 ++++--------------- .../storage/rawfile/DirectRawFileScanner.java | 28 +- .../storage/rawfile/DirectRawFileWriter.java | 145 ++++-- 40 files changed, 1288 insertions(+), 853 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 48f9016..d561844 100644 --- a/CHANGES +++ b/CHANGES @@ -7,6 +7,8 @@ Release 0.11.1 - unreleased IMPROVEMENT + TAJO-1271: Improve memory usage of Hash-shuffle. (jinho) + TAJO-1966: Decrease memory usage of TajoTestingCluster. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index 0871084..9b202d2 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -158,8 +158,9 @@ public class TajoTestingCluster { // Python function path conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString()); - // Query output file - conf.setVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT, BuiltinStorages.DRAW); + // Buffer size + conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1); + conf.setInt(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE.varname, 1); /** decrease Hbase thread and memory cache for testing */ //server handler http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 46df687..08c12a0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -117,7 +117,9 @@ public enum SessionVars implements ConfigKey { // for physical Executors EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT, - Long.class, Validators.min("0")), + Integer.class, Validators.min("0")), + HASH_SHUFFLE_BUFFER_SIZE(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE, "hash-shuffle buffer size for local disk I/O (mb)" + , DEFAULT, Integer.class, Validators.min("1")), HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT, Long.class, Validators.min("0")), INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD, http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 9458732..097d689 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -213,12 +213,11 @@ public class TajoConf extends Configuration { SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 60, Validators.min("1")), SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 60, Validators.min("1")), SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 2, Validators.min("0")), - SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 10000), - SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volumn-mb", 30), - HASH_SHUFFLE_PARENT_DIRS("tajo.hash.shuffle.parent.dirs.count", 10), + SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volume-mb", 30), + SHUFFLE_HASH_PARENT_DIRS("tajo.shuffle.hash.parent.dirs.count", 64), // Query output Configuration -------------------------------------------------- - QUERY_OUTPUT_DEFAULT_FILE_FORMAT("tajo.query.output.file-format", BuiltinStorages.TEXT, Validators.javaString()), + QUERY_OUTPUT_DEFAULT_FILE_FORMAT("tajo.query.output.file-format", BuiltinStorages.DRAW, Validators.javaString()), // Storage Configuration -------------------------------------------------- ROWFILE_SYNC_INTERVAL("rowfile.sync.interval", 100), @@ -334,7 +333,7 @@ public class TajoConf extends Configuration { $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 10), // for physical Executors - $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L), + $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200), $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb", 64l, Validators.min("0")), $EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb", 64l, Validators.min("0")), @@ -342,6 +341,7 @@ public class TajoConf extends Configuration { Validators.min("0")), $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb", 64l, Validators.min("0")), + $EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE("tajo.executor.hash-shuffle.buffer-mb", 100, Validators.min("1")), $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation (todo this is broken) $AGG_HASH_TABLE_SIZE("tajo.executor.aggregate.hash-table.size", 10000), http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 97ac7b4..4913d3b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -42,10 +42,6 @@ public class BufferPool { } static { - /* TODO Enable thread cache - * Create a pooled ByteBuf allocator but disables the thread-local cache. - * Because the TaskRunner thread is newly created - * */ if (TajoConstants.IS_TEST_MODE) { /* Disable pooling buffers for memory usage */ @@ -55,7 +51,7 @@ public class BufferPool { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); } else { TajoConf tajoConf = new TajoConf(); - ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0); + ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, true), 0); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java index cb417f3..ebdcc26 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -85,7 +85,7 @@ public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, public UnSafeTuple buildToZeroCopyTuple() { UnSafeTuple zcTuple = new UnSafeTuple(); - zcTuple.set(memoryBlock, memoryBlock.readerPosition(), memoryBlock.readableBytes(), dataTypes()); + zcTuple.set(memoryBlock, memoryBlock.readerPosition(), dataTypes()); return zcTuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java new file mode 100644 index 0000000..a88d2f1 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import io.netty.util.internal.PlatformDependent; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.BooleanDatum; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.BitArray; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; + +/** + * This class represent serialization of RawFile + * + * Row Record Structure + * + * | row length | null flags length | null flags | field 1 | field 2| ... | field N |; + * + * | (4 bytes) (2 bytes) (N bytes) | |; + * Header values + */ +public class CompactRowBlockWriter implements RowWriter { + private static final int RECORD_FIELD_SIZE = 4; + // Maximum variant int32 size is 5 + private static final short MAXIMUM_VARIANT_INT32 = 5; + // Maximum variant int64 size is 10 + private static final short MAXIMUM_VARIANT_INT64 = 10; + + private final RowBlock rowBlock; + private final BitArray nullFlags; + + /** record capacity + offset list */ + private final int headerSize; + + private final DataType[] dataTypes; + + private int curFieldIdx; + private int curOffset; + + + public CompactRowBlockWriter(RowBlock rowBlock) { + this.dataTypes = rowBlock.getDataTypes(); + this.rowBlock = rowBlock; + + // compute the number of bytes, representing the null flags + nullFlags = new BitArray(dataTypes.length); + headerSize = RECORD_FIELD_SIZE + SizeOf.SIZE_OF_SHORT + nullFlags.bytesLength(); + + if (!rowBlock.getMemory().hasAddress()) { + throw new TajoInternalError(rowBlock.getMemory().getClass().getSimpleName() + + " does not support to direct memory access"); + } + } + + + /** + * Encode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n A signed 32-bit integer. + * @return An unsigned 32-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + */ + public static int encodeZigZag32(final int n) { + // Note: the right-shift must be arithmetic + return (n << 1) ^ (n >> 31); + } + + /** + * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n A signed 64-bit integer. + * @return An unsigned 64-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + */ + public static long encodeZigZag64(final long n) { + // Note: the right-shift must be arithmetic + return (n << 1) ^ (n >> 63); + } + + /** + * Encode and write a varint. {@code value} is treated as + * unsigned, so it won't be sign-extended if negative. + */ + public static short writeRawVarint32(long address, int value) { + short length = 0; + while (true) { + if ((value & ~0x7F) == 0) { + PlatformDependent.putByte(address + length, (byte) value); + length++; + return length; + } else { + PlatformDependent.putByte(address + length, (byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + length++; + } + } + } + + /** + * Encode and write a varint64. + */ + public static short writeRawVarint64(long address, long value) { + short length = 0; + while (true) { + if ((value & ~0x7FL) == 0) { + PlatformDependent.putByte(address + length, (byte) value); + length++; + return length; + } else { + PlatformDependent.putByte(address + length, (byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + length++; + } + } + } + + /** + * Compute the number of bytes that would be needed to encode a varint. + * {@code value} is treated as unsigned, so it won't be sign-extended if + * negative. + */ + public static int computeRawVarint32Size(final int value) { + if ((value & (0xffffffff << 7)) == 0) return 1; + if ((value & (0xffffffff << 14)) == 0) return 2; + if ((value & (0xffffffff << 21)) == 0) return 3; + if ((value & (0xffffffff << 28)) == 0) return 4; + return 5; + } + + /** + * Current memory address of the buffer + * + * @return The memory address + */ + public long address() { + return rowBlock.getMemory().address(); + } + + /** + * Current position + * + * @return The position + */ + public int position() { + return rowBlock.getMemory().writerPosition(); + } + + + /** + * Forward the address; + * + * @param length Length to be forwarded + */ + public void forward(int length) { + rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() + length); + } + + public void ensureSize(int size) { + rowBlock.getMemory().ensureSize(size); + } + + @Override + public DataType[] dataTypes() { + return rowBlock.getDataTypes(); + } + + /** + * Current memory address of the row + * + * @return The memory address + */ + public long recordStartAddr() { + return currentAddr() - curOffset; + } + + /** + * Memory address that point to the first byte of the buffer + * + * @return The memory address + */ + private long currentAddr() { + return address() + position(); + } + + public int offset() { + return position(); + } + + + @Override + public void clear() { + curOffset = 0; + curFieldIdx = 0; + nullFlags.clear(); + } + + @Override + public boolean startRow() { + ensureSize(headerSize); + nullFlags.clear(); + + curOffset = headerSize; + curFieldIdx = 0; + forward(headerSize); + return true; + } + + + public void endRow() { + long rowHeaderPos = recordStartAddr(); + // curOffset is equivalent to a byte length of this row. + PlatformDependent.putInt(rowHeaderPos, curOffset); + rowHeaderPos += SizeOf.SIZE_OF_INT; + + //set null flags + byte [] flags = nullFlags.toArray(); + PlatformDependent.putShort(rowHeaderPos, (short) flags.length); + rowHeaderPos += SizeOf.SIZE_OF_SHORT; + PlatformDependent.copyMemory(flags, 0, rowHeaderPos, flags.length); + + rowBlock.setRows(rowBlock.rows() + 1); + } + + @Override + public void skipField() { + // set null flag + nullFlags.set(curFieldIdx); + curFieldIdx++; + } + + /** + * set current buffer position and forward field length + * @param fieldLength + */ + private void forwardField(int fieldLength) { + forward(fieldLength); + curOffset += fieldLength; + + } + + @Override + public void putByte(byte val) { + ensureSize(SizeOf.SIZE_OF_BYTE); + long addr = currentAddr(); + + PlatformDependent.putByte(addr, val); + curFieldIdx++; + forwardField(SizeOf.SIZE_OF_BYTE); + } + + @Override + public void putBool(boolean val) { + putByte(val ? BooleanDatum.TRUE_INT : BooleanDatum.FALSE_INT); + } + + @Override + public void putInt2(short val) { + ensureSize(SizeOf.SIZE_OF_SHORT); + long addr = currentAddr(); + + PlatformDependent.putShort(addr, val); + curFieldIdx++; + forwardField(SizeOf.SIZE_OF_SHORT); + } + + @Override + public void putInt4(int val) { + ensureSize(MAXIMUM_VARIANT_INT32); + + curFieldIdx++; + forwardField(writeRawVarint32(currentAddr(), encodeZigZag32(val))); + } + + @Override + public void putInt8(long val) { + ensureSize(MAXIMUM_VARIANT_INT64); + + curFieldIdx++; + forwardField(writeRawVarint64(currentAddr(), encodeZigZag64(val))); + } + + @Override + public void putFloat4(float val) { + ensureSize(SizeOf.SIZE_OF_FLOAT); + long addr = currentAddr(); + + UnsafeUtil.unsafe.putFloat(addr, val); + curFieldIdx++; + forwardField(SizeOf.SIZE_OF_FLOAT); + } + + @Override + public void putFloat8(double val) { + ensureSize(SizeOf.SIZE_OF_DOUBLE); + long addr = currentAddr(); + + UnsafeUtil.unsafe.putDouble(addr, val); + curFieldIdx++; + forwardField(SizeOf.SIZE_OF_DOUBLE); + } + + @Override + public void putText(String val) { + putText(val.getBytes(TextDatum.DEFAULT_CHARSET)); + } + + @Override + public void putText(byte[] val) { + putBlob(val); + } + + @Override + public void putBlob(byte[] val) { + int bytesLen = val.length; + + ensureSize(MAXIMUM_VARIANT_INT32 + bytesLen); + long addr = currentAddr(); + + short length = writeRawVarint32(addr, bytesLen); + PlatformDependent.copyMemory(val, 0, addr + length, bytesLen); + curFieldIdx++; + forwardField(length + bytesLen); + } + + @Override + public void putDate(int val) { + ensureSize(SizeOf.SIZE_OF_INT); + long addr = currentAddr(); + + PlatformDependent.putInt(addr, val); + curFieldIdx++; + forwardField(SizeOf.SIZE_OF_INT); + } + + @Override + public void putTime(long val) { + ensureSize(SizeOf.SIZE_OF_LONG); + long addr = currentAddr(); + + PlatformDependent.putLong(addr, val); + curFieldIdx++; + forwardField(SizeOf.SIZE_OF_LONG); + } + + @Override + public void putTimestamp(long val) { + putTime(val); + } + + @Override + public void putInterval(IntervalDatum val) { + ensureSize(MAXIMUM_VARIANT_INT32 + MAXIMUM_VARIANT_INT64); + long addr = currentAddr(); + + short length = writeRawVarint32(addr, encodeZigZag32(val.getMonths())); + length += writeRawVarint64(addr, encodeZigZag64(val.getMilliSeconds())); + + curFieldIdx++; + forwardField(length); + } + + @Override + public void putInet4(int val) { + ensureSize(SizeOf.SIZE_OF_INT); + long addr = currentAddr(); + + PlatformDependent.putInt(addr, val); + curFieldIdx++; + forwardField(SizeOf.SIZE_OF_INT); + } + + @Override + public void putProtoDatum(ProtobufDatum val) { + putBlob(val.asByteArray()); + } + + @Override + public void addTuple(Tuple tuple) { + OffHeapRowBlockUtils.convert(tuple, this); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java index 10e493f..1852e6d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java @@ -28,11 +28,11 @@ import static org.apache.tajo.common.TajoDataTypes.DataType; public class DirectBufTuple extends UnSafeTuple implements Deallocatable { private MemoryBlock memoryBlock; - public DirectBufTuple(int length, DataType[] types) { - ByteBuffer bb = ByteBuffer.allocateDirect(length).order(ByteOrder.LITTLE_ENDIAN); + public DirectBufTuple(DataType[] types) { + ByteBuffer bb = ByteBuffer.allocateDirect(getLength()).order(ByteOrder.LITTLE_ENDIAN); memoryBlock = new ResizableMemoryBlock(bb); - set(memoryBlock, 0, length, types); + set(memoryBlock, 0, types); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java index dd377cf..ec5033b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java @@ -48,10 +48,9 @@ public class HeapRowBlockReader implements RowBlockReader<HeapTuple> { public boolean next(HeapTuple tuple) { if (curRowIdxForRead < rows) { - int recordLen = memoryBlock.getInt(curPosForRead); - tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes); + tuple.set(memoryBlock, curPosForRead, dataTypes); - curPosForRead += recordLen; + curPosForRead += tuple.getLength(); curRowIdxForRead++; memoryBlock.readerPosition(curPosForRead); http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java index 5d2fdc9..c6401ec 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java @@ -41,16 +41,16 @@ public class HeapTuple extends ZeroCopyTuple implements Cloneable { private DataType[] types; @Override - public void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types) { + public void set(MemoryBlock memoryBlock, int relativePos, DataType[] types) { this.buffer = memoryBlock.getBuffer(); this.types = types; - super.set(relativePos, length); + super.set(relativePos); } protected void set(final byte[] bytes, final DataType[] types) { this.buffer = Unpooled.wrappedBuffer(bytes).order(ByteOrder.LITTLE_ENDIAN); this.types = types; - super.set(0, bytes.length); + super.set(0); } @Override @@ -59,6 +59,11 @@ public class HeapTuple extends ZeroCopyTuple implements Cloneable { } @Override + public int getLength() { + return buffer.getInt(getRelativePos()); + } + + @Override public TajoDataTypes.Type type(int fieldId) { return types[fieldId].getType(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java index 922fc68..3d02f9a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java @@ -19,6 +19,8 @@ package org.apache.tajo.tuple.memory; import io.netty.util.internal.PlatformDependent; +import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.annotation.NotThreadSafe; import org.apache.tajo.exception.NotImplementedException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.tuple.RowBlockReader; @@ -32,10 +34,12 @@ import java.nio.channels.ScatteringByteChannel; import static org.apache.tajo.common.TajoDataTypes.DataType; +@NotThreadSafe public class MemoryRowBlock implements RowBlock, Deallocatable { public static final int NULL_FIELD_OFFSET = -1; - private DataType[] dataTypes; + private final DataType[] dataTypes; + private final String dataFormat; // Basic States private int maxRowNum = Integer.MAX_VALUE; // optional @@ -45,20 +49,20 @@ public class MemoryRowBlock implements RowBlock, Deallocatable { private MemoryBlock memory; public MemoryRowBlock(DataType[] dataTypes, ResizableLimitSpec limitSpec, boolean isDirect) { + this(dataTypes, limitSpec, isDirect, BuiltinStorages.DRAW); + } + + public MemoryRowBlock(DataType[] dataTypes, ResizableLimitSpec limitSpec, boolean isDirect, String dataFormat) { this.memory = new ResizableMemoryBlock(limitSpec, isDirect); this.dataTypes = dataTypes; + this.dataFormat = dataFormat; } public MemoryRowBlock(MemoryRowBlock rowBlock) { this.memory = TUtil.checkTypeAndGet(rowBlock.getMemory().duplicate(), ResizableMemoryBlock.class); this.rowNum = rowBlock.rowNum; this.dataTypes = rowBlock.dataTypes; - } - - public MemoryRowBlock(MemoryBlock memory, DataType[] dataTypes, int rowNum) { - this.memory = memory; - this.rowNum = rowNum; - this.dataTypes = dataTypes; + this.dataFormat = rowBlock.dataFormat; } public MemoryRowBlock(DataType[] dataTypes) { @@ -69,8 +73,13 @@ public class MemoryRowBlock implements RowBlock, Deallocatable { this(dataTypes, new ResizableLimitSpec(bytes), true); } - public MemoryRowBlock(DataType[] dataTypes, int bytes, boolean isDirect) { - this(dataTypes, new ResizableLimitSpec(bytes), isDirect); + public MemoryRowBlock(DataType[] dataTypes, int bytes, boolean isDirect, String dataFormat) { + this(dataTypes, new ResizableLimitSpec(bytes), isDirect, dataFormat); + } + + @Override + public String getDataFormat() { + return dataFormat; } @Override @@ -91,6 +100,20 @@ public class MemoryRowBlock implements RowBlock, Deallocatable { return memory.capacity(); } + @Override + public int usedMem() { + return memory.writerPosition(); + } + + @Override + public float usage() { + if (usedMem() > 0) { + return (usedMem() / (float) capacity()); + } else { + return 0.0f; + } + } + public int maxRowNum() { return maxRowNum; } @@ -112,6 +135,15 @@ public class MemoryRowBlock implements RowBlock, Deallocatable { @Override public boolean copyFromChannel(ScatteringByteChannel channel) throws IOException { + switch (dataFormat) { + case BuiltinStorages.DRAW: + return fillDrawBuffer(channel); + default: + throw new TajoInternalError(new NotImplementedException("Heap memory writer not implemented yet")); + } + } + + protected boolean fillDrawBuffer(ScatteringByteChannel channel) throws IOException { reset(); int readBytes = memory.writeBytes(channel); @@ -143,13 +175,23 @@ public class MemoryRowBlock implements RowBlock, Deallocatable { @Override public RowWriter getWriter() { + if (!getMemory().hasAddress()) { + throw new TajoInternalError(new NotImplementedException("Heap memory writer not implemented yet")); + } + if (builder == null) { - if (!getMemory().hasAddress()) { - throw new TajoInternalError(new NotImplementedException("Heap memory writer not implemented yet")); - } else { + switch (dataFormat) { + case BuiltinStorages.DRAW: this.builder = new OffHeapRowBlockWriter(this); + break; + case BuiltinStorages.RAW: + this.builder = new CompactRowBlockWriter(this); + break; + default: + throw new TajoInternalError(new NotImplementedException(dataFormat + " memory writer not implemented yet")); } } + return builder; } @@ -165,10 +207,17 @@ public class MemoryRowBlock implements RowBlock, Deallocatable { @Override public RowBlockReader getReader() { - if (!getMemory().hasAddress()) { - return new HeapRowBlockReader(this); - } else { - return new OffHeapRowBlockReader(this); + + switch (dataFormat) { + case BuiltinStorages.DRAW: { + if (!getMemory().hasAddress()) { + return new HeapRowBlockReader(this); + } else { + return new OffHeapRowBlockReader(this); + } + } + default: + throw new TajoInternalError(new NotImplementedException(dataFormat + " memory writer not implemented yet")); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java index ccaeffc..c5673e3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java @@ -18,7 +18,6 @@ package org.apache.tajo.tuple.memory; -import io.netty.util.internal.PlatformDependent; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.tuple.RowBlockReader; @@ -54,11 +53,8 @@ public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { public boolean next(ZeroCopyTuple tuple) { if (curRowIdxForRead < rows) { - long recordStartPtr = memoryBlock.address() + curPosForRead; - int recordLen = PlatformDependent.getInt(recordStartPtr); - tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes); - - curPosForRead += recordLen; + tuple.set(memoryBlock, curPosForRead, dataTypes); + curPosForRead += tuple.getLength(); curRowIdxForRead++; memoryBlock.readerPosition(curPosForRead); http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java index e8f219c..1aca22f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java @@ -32,6 +32,11 @@ import java.util.Comparator; import java.util.List; public class OffHeapRowBlockUtils { + private static TupleConverter tupleConverter; + + static { + tupleConverter = new TupleConverter(); + } public static List<Tuple> sort(MemoryRowBlock rowBlock, Comparator<Tuple> comparator) { List<Tuple> tupleList = Lists.newArrayList(); @@ -80,62 +85,86 @@ public class OffHeapRowBlockUtils { return tuples; } - public static void convert(Tuple tuple, RowWriter writer) { - writer.startRow(); + /** + * This class is tuple converter to the RowBlock + */ + public static class TupleConverter { - for (int i = 0; i < writer.dataTypes().length; i++) { - if (tuple.isBlankOrNull(i)) { - writer.skipField(); - continue; + public void convert(Tuple tuple, RowWriter writer) { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + writeField(i, tuple, writer); } - switch (writer.dataTypes()[i].getType()) { - case BOOLEAN: - writer.putBool(tuple.getBool(i)); - break; - case BIT: - writer.putByte(tuple.getByte(i)); - break; - case INT1: - case INT2: - writer.putInt2(tuple.getInt2(i)); - break; - case INT4: - case DATE: - case INET4: - writer.putInt4(tuple.getInt4(i)); - break; - case INT8: - case TIMESTAMP: - case TIME: - writer.putInt8(tuple.getInt8(i)); - break; - case FLOAT4: - writer.putFloat4(tuple.getFloat4(i)); - break; - case FLOAT8: - writer.putFloat8(tuple.getFloat8(i)); - break; - case CHAR: - case TEXT: - writer.putText(tuple.getBytes(i)); - break; - case BLOB: - writer.putBlob(tuple.getBytes(i)); - break; - case INTERVAL: - writer.putInterval((IntervalDatum) tuple.getInterval(i)); - break; - case PROTOBUF: - writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); - break; - case NULL_TYPE: + + writer.endRow(); + } + + protected void writeField(int colIdx, Tuple tuple, RowWriter writer) { + + if (tuple.isBlankOrNull(colIdx)) { writer.skipField(); - break; - default: - throw new TajoRuntimeException( - new UnsupportedException("unknown data type '" + writer.dataTypes()[i].getType().name() + "'")); + } else { + switch (writer.dataTypes()[colIdx].getType()) { + case BOOLEAN: + writer.putBool(tuple.getBool(colIdx)); + break; + case BIT: + writer.putByte(tuple.getByte(colIdx)); + break; + case INT1: + case INT2: + writer.putInt2(tuple.getInt2(colIdx)); + break; + case INT4: + writer.putInt4(tuple.getInt4(colIdx)); + break; + case DATE: + writer.putDate(tuple.getInt4(colIdx)); + break; + case INT8: + writer.putInt8(tuple.getInt8(colIdx)); + break; + case TIMESTAMP: + writer.putTimestamp(tuple.getInt8(colIdx)); + break; + case TIME: + writer.putTime(tuple.getInt8(colIdx)); + break; + case FLOAT4: + writer.putFloat4(tuple.getFloat4(colIdx)); + break; + case FLOAT8: + writer.putFloat8(tuple.getFloat8(colIdx)); + break; + case CHAR: + case TEXT: + writer.putText(tuple.getBytes(colIdx)); + break; + case BLOB: + writer.putBlob(tuple.getBytes(colIdx)); + break; + case INTERVAL: + writer.putInterval((IntervalDatum) tuple.getInterval(colIdx)); + break; + case PROTOBUF: + writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(colIdx)); + break; + case INET4: + writer.putInet4(tuple.getInt4(colIdx)); + break; + case NULL_TYPE: + writer.skipField(); + break; + default: + throw new TajoRuntimeException( + new UnsupportedException("unknown data type '" + writer.dataTypes()[colIdx].getType().name() + "'")); + } } } - writer.endRow(); + } + + public static void convert(Tuple tuple, RowWriter writer) { + tupleConverter.convert(tuple, writer); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java index 614b3fb..ddf50ab 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java @@ -21,6 +21,8 @@ package org.apache.tajo.tuple.memory; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.util.FileUtil; /** @@ -29,7 +31,7 @@ import org.apache.tajo.util.FileUtil; * due to ByteBuffer. */ public class ResizableLimitSpec { - private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); + private static final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); @@ -114,10 +116,11 @@ public class ResizableLimitSpec { return (int) initSize; } - if (currentSize > Integer.MAX_VALUE) { - LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); - return Integer.MAX_VALUE; + if (currentSize == Integer.MAX_VALUE) { + throw new TajoRuntimeException(new UnsupportedException( + "Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)")); } + long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); if (nextSize > limitBytes) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java index 22c2561..09faff9 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java @@ -39,10 +39,12 @@ public class ResizableMemoryBlock implements MemoryBlock { protected ByteBuf buffer; protected ResizableLimitSpec limitSpec; + private long memoryAddress; public ResizableMemoryBlock(ByteBuf buffer, ResizableLimitSpec limitSpec) { this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN); this.limitSpec = limitSpec; + this.memoryAddress = this.buffer.hasMemoryAddress() ? this.buffer.memoryAddress() : 0; } public ResizableMemoryBlock(ByteBuf buffer) { @@ -50,13 +52,13 @@ public class ResizableMemoryBlock implements MemoryBlock { } public ResizableMemoryBlock(ByteBuffer buffer) { - this.buffer = Unpooled.wrappedBuffer(buffer).order(ByteOrder.LITTLE_ENDIAN); - this.limitSpec = new ResizableLimitSpec(buffer.capacity()); + this(Unpooled.wrappedBuffer(buffer), new ResizableLimitSpec(buffer.capacity(), buffer.capacity())); } public ResizableMemoryBlock(ResizableLimitSpec limitSpec, boolean isDirect) { if (isDirect) { this.buffer = BufferPool.directBuffer((int) limitSpec.initialSize(), (int) limitSpec.limit()); + this.memoryAddress = buffer.memoryAddress(); } else { this.buffer = BufferPool.heapBuffer((int) limitSpec.initialSize(), (int) limitSpec.limit()); } @@ -65,7 +67,7 @@ public class ResizableMemoryBlock implements MemoryBlock { @Override public long address() { - return buffer.memoryAddress(); + return memoryAddress; } @Override @@ -123,15 +125,14 @@ public class ResizableMemoryBlock implements MemoryBlock { return buffer.writerIndex(); } - @Override public void ensureSize(int size) { if (!buffer.isWritable(size)) { - if (!limitSpec.canIncrease(buffer.capacity())) { + if (!limitSpec.canIncrease(size)) { throw new RuntimeException("Cannot increase RowBlock anymore."); } - int newBlockSize = limitSpec.increasedSize(buffer.capacity()); + int newBlockSize = limitSpec.increasedSize(size); resize(newBlockSize); LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); } @@ -144,17 +145,18 @@ public class ResizableMemoryBlock implements MemoryBlock { throw new RuntimeException("Resize cannot exceed the capacity limit"); } - if (newSize < buffer.capacity()) { + if (newSize < buffer.writableBytes()) { LOG.warn("The capacity reduction is ignored."); } int newBlockSize = UnsafeUtil.alignedSize(newSize); buffer = BufferPool.ensureWritable(buffer, newBlockSize); + memoryAddress = buffer.memoryAddress(); } @Override public void release() { - buffer.release(); + if(buffer.refCnt() > 0) buffer.release(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java index 68902fb..1ab1042 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java @@ -25,11 +25,31 @@ import java.io.IOException; import java.nio.channels.ScatteringByteChannel; public interface RowBlock { + /** + * a data format for de/serialization + */ + String getDataFormat(); + /** + * reset the memory and writer + */ void clear(); + /** + * @return the number of bytes this memory block can contain. + */ int capacity(); + /** + * @return the number of written bytes in this memory block + */ + int usedMem(); + + /** + * @return the percentage of written bytes in this memory block + */ + float usage(); + void setRows(int rowNum); int rows(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java index 3dc8c23..62e29b8 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java @@ -42,22 +42,22 @@ import static org.apache.tajo.common.TajoDataTypes.DataType; public class UnSafeTuple extends ZeroCopyTuple { private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - private long address; + private MemoryBlock memoryBlock; private DataType[] types; @Override - public void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types) { + public void set(MemoryBlock memoryBlock, int relativePos, DataType[] types) { Preconditions.checkArgument(memoryBlock.hasAddress()); - this.address = memoryBlock.address(); + this.memoryBlock = memoryBlock; this.types = types; - super.set(relativePos, length); + super.set(relativePos); } public void set(UnSafeTuple tuple) { - this.address = tuple.address; + this.memoryBlock = tuple.memoryBlock; this.types = tuple.types; - super.set(tuple.getRelativePos(), tuple.getLength()); + super.set(tuple.getRelativePos()); } @Override @@ -66,6 +66,11 @@ public class UnSafeTuple extends ZeroCopyTuple { } @Override + public int getLength() { + return PlatformDependent.getInt(address()); + } + + @Override public TajoDataTypes.Type type(int fieldId) { return types[fieldId].getType(); } @@ -93,7 +98,7 @@ public class UnSafeTuple extends ZeroCopyTuple { } public long address() { - return address + getRelativePos(); + return memoryBlock.address() + getRelativePos(); } public HeapTuple toHeapTuple() { @@ -110,8 +115,9 @@ public class UnSafeTuple extends ZeroCopyTuple { public long getFieldAddr(int fieldId) { int fieldOffset = getFieldOffset(fieldId); - if (fieldOffset == -1) { - throw new RuntimeException("Invalid Field Access: " + fieldId); + if (fieldOffset < 0 || fieldOffset > getLength()) { + throw new RuntimeException("Invalid Access. Field : " + fieldId + + ", Offset:" + fieldOffset + ", Record length:" + getLength()); } return address() + fieldOffset; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java index 1f4f57e..e9108f2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java @@ -24,22 +24,18 @@ import org.apache.tajo.storage.Tuple; public abstract class ZeroCopyTuple implements Tuple { protected int relativePos; - protected int length; - public abstract void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types); + public abstract void set(MemoryBlock memoryBlock, int relativePos, DataType[] types); - void set(int relativePos, int length) { + void set(int relativePos) { this.relativePos = relativePos; - this.length = length; } public int getRelativePos() { return relativePos; } - public int getLength() { - return length; - } + public abstract int getLength(); @Override public Tuple clone() throws CloneNotSupportedException { http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java index e62496a..973266b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java @@ -19,6 +19,7 @@ package org.apache.tajo.util; import java.nio.ByteBuffer; +import java.util.Arrays; public class BitArray { private byte [] data; @@ -60,9 +61,7 @@ public class BitArray { } public void clear() { - for (int i = 0; i < data.length; i++) { - data[i] = 0; - } + Arrays.fill(data, (byte) 0); } public int bytesLength() { http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index 4ee2c9c..30c46e5 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -122,7 +122,7 @@ public class TestExternalSortExec { public final void testNext() throws IOException, TajoException { conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 2); QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); - queryContext.setLong(SessionVars.EXTSORT_BUFFER_SIZE, 1024*1024); + queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 1); FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getUri()), Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java index a4afa7f..2f4d66f 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -216,6 +216,7 @@ public class TestHashJoinExec { HashJoinExec joinExec = proj.getChild(); assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec); + exec.close(); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index ada1161..6145b14 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -573,6 +573,7 @@ public class TestPhysicalPlanner { LogicalNode rootNode = optimizer.optimize(plan); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.close(); assertTrue(exec instanceof SortBasedColPartitionStoreExec); } @@ -597,6 +598,7 @@ public class TestPhysicalPlanner { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.close(); assertTrue(exec instanceof HashBasedColPartitionStoreExec); } @@ -621,6 +623,7 @@ public class TestPhysicalPlanner { PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.close(); assertTrue(exec instanceof SortBasedColPartitionStoreExec); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java index 349aec0..b1f53da 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -20,10 +20,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.Path; -import org.apache.tajo.LocalTajoTestingUtility; -import org.apache.tajo.SessionVars; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; @@ -42,6 +39,7 @@ import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.worker.TaskAttemptContext; import org.junit.After; @@ -65,7 +63,7 @@ public class TestProgressExternalSortExec { private LogicalPlanner planner; private Path testDir; - private final int numTuple = 5000; + private final int numTuple = 50000; private Random rnd = new Random(System.currentTimeMillis()); private TableDesc employee; @@ -87,8 +85,8 @@ public class TestProgressExternalSortExec { schema.addColumn("empid", TajoDataTypes.Type.INT4); schema.addColumn("deptname", TajoDataTypes.Type.TEXT); - TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW"); - Path employeePath = new Path(testDir, "employee.csv"); + TableMeta employeeMeta = CatalogUtil.newTableMeta(BuiltinStorages.RAW); + Path employeePath = new Path(testDir, "employee.raw"); Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(employeeMeta, schema, employeePath); appender.enableStats(); @@ -126,18 +124,24 @@ public class TestProgressExternalSortExec { @Test public void testExternalSortExecProgressWithMemTableScanner() throws Exception { - testProgress(testDataStats.getNumBytes() * 20); //multiply 20 for memory fit + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + int bufferSize = (int) (testDataStats.getNumBytes() * 20) / StorageUnit.MB; //multiply 2 for memory fit + queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, bufferSize); + + testProgress(queryContext); } @Test public void testExternalSortExecProgressWithPairWiseMerger() throws Exception { - testProgress(testDataStats.getNumBytes()); + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + int bufferSize = (int) Math.max((testDataStats.getNumBytes() / StorageUnit.MB), 1); + queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, bufferSize); + + testProgress(queryContext); } - private void testProgress(long sortBufferBytesNum) throws Exception { + private void testProgress(QueryContext queryContext) throws Exception { conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 2); - QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); - queryContext.setLong(SessionVars.EXTSORT_BUFFER_SIZE, sortBufferBytesNum); FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getUri()), Integer.MAX_VALUE); @@ -191,9 +195,9 @@ public class TestProgressExternalSortExec { TableStats tableStats = exec.getInputStats(); assertNotNull(tableStats); assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue()); - assertEquals(cnt, testDataStats.getNumRows().longValue()); - assertEquals(cnt, tableStats.getNumRows().longValue()); - assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue()); + assertEquals(testDataStats.getNumRows().longValue(), cnt); + assertEquals(testDataStats.getNumRows().longValue(), tableStats.getNumRows().longValue()); + assertTrue(testDataStats.getNumBytes().longValue() <= tableStats.getReadBytes().longValue()); // for rescan test preVal = null; @@ -216,9 +220,10 @@ public class TestProgressExternalSortExec { tableStats = exec.getInputStats(); assertNotNull(tableStats); assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue()); - assertEquals(cnt, testDataStats.getNumRows().longValue()); - assertEquals(cnt, tableStats.getNumRows().longValue()); - assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue()); + assertEquals(testDataStats.getNumRows().longValue(), cnt); + assertEquals(testDataStats.getNumRows().longValue(), tableStats.getNumRows().longValue()); + //'ReadBytes' is actual read bytes + assertTrue(testDataStats.getNumBytes().longValue() <= tableStats.getReadBytes().longValue()); conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.defaultIntVal); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java index f845bb3..9f5ecb5 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java @@ -106,7 +106,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { res = executeQuery(); // in/out * stage(4) - long[] expectedNumRows = new long[]{5, 5, 2, 2, 7, 2, 2, 2}; + long[] expectedNumRows = new long[]{5, 5, 2, 2, 2, 2, 2, 2}; long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 64}; long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0}; http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index d65c346..ec8344f 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -28,7 +28,8 @@ Available Session Variables: \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb) \set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled \set QUERY_EXECUTE_PARALLEL [int value] - Maximum parallel running of execution blocks for a query -\set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb) +\set EXTSORT_BUFFER_SIZE [int value] - sort buffer size for external sort (mb) +\set HASH_SHUFFLE_BUFFER_SIZE [int value] - hash-shuffle buffer size for local disk I/O (mb) \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb) \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb) \set OUTER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash outer join (mb) http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 2a1717e..b9e98ba 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -904,14 +904,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) throws IOException { // check if an input is sorted in the same order to the subsequence sort operator. - // TODO - it works only if input files are raw files. We should check the file format. - // Since the default intermediate file format is raw file, it is not problem right now. if (checkIfSortEquivalance(ctx, scanNode, node)) { if (ctx.getTable(scanNode.getCanonicalName()) == null) { return new SeqScanExec(ctx, scanNode, null); } FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); - return new ExternalSortExec(ctx, (SortNode) node.peek(), fragments); + return new ExternalSortExec(ctx, (SortNode) node.peek(), scanNode, fragments); } else { Enforcer enforcer = ctx.getEnforcer(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index b9ab344..8abafec 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -32,6 +33,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.PhysicalPlanningException; +import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; @@ -49,7 +51,6 @@ import java.util.List; import java.util.concurrent.*; import static org.apache.tajo.storage.RawFile.RawFileAppender; -import static org.apache.tajo.storage.RawFile.RawFileScanner; /** * This external sort algorithm can be characterized by the followings: @@ -69,7 +70,8 @@ public class ExternalSortExec extends SortExec { private static final String INTERMEDIATE_FILE_PREFIX = "@interFile_"; private SortNode plan; - private final TableMeta meta; + /** the data format of intermediate file*/ + private TableMeta intermediateMeta; /** the defaultFanout of external sort */ private final int defaultFanout; /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */ @@ -87,9 +89,9 @@ public class ExternalSortExec extends SortExec { /** local file system */ private final RawLocalFileSystem localFS; /** final output files which are used for cleaning */ - private List<FileFragment> finalOutputFiles = null; + private List<Chunk> finalOutputFiles = null; /** for directly merging sorted inputs */ - private List<FileFragment> mergedInputFragments = null; + private List<Chunk> mergedInputFragments = null; /////////////////////////////////////////////////// // transient variables @@ -108,8 +110,6 @@ public class ExternalSortExec extends SortExec { super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys()); this.plan = plan; - this.meta = CatalogUtil.newTableMeta("ROWFILE"); - this.defaultFanout = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT); if (defaultFanout < 2) { throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2"); @@ -121,18 +121,19 @@ public class ExternalSortExec extends SortExec { this.inMemoryTable = new TupleList(context.getQueryContext().getInt(SessionVars.SORT_LIST_SIZE)); this.sortTmpDir = getExecutorTmpDir(); - localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - localFS = new RawLocalFileSystem(); + this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + this.localFS = new RawLocalFileSystem(); + this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.RAW); //TODO change to SHUFFLE_FILE_FORMAT } - public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, + public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, final ScanNode scanNode, final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException { this(context, plan); mergedInputFragments = TUtil.newList(); for (CatalogProtos.FragmentProto proto : fragments) { FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto); - mergedInputFragments.add(fragment); + mergedInputFragments.add(new Chunk(fragment, scanNode.getTableDesc().getMeta())); } } @@ -154,9 +155,9 @@ public class ExternalSortExec extends SortExec { /** * Sort a tuple block and store them into a chunk file */ - private Path sortAndStoreChunk(int chunkId, TupleList tupleBlock) + private Chunk sortAndStoreChunk(int chunkId, TupleList tupleBlock) throws IOException { - TableMeta meta = CatalogUtil.newTableMeta("RAW"); + int rowNum = tupleBlock.size(); long sortStart = System.currentTimeMillis(); @@ -165,7 +166,9 @@ public class ExternalSortExec extends SortExec { long chunkWriteStart = System.currentTimeMillis(); Path outputPath = getChunkPathForWrite(0, chunkId); - final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); + final RawFileAppender appender = + new RawFileAppender(context.getConf(), null, inSchema, intermediateMeta, outputPath); + appender.init(); for (Tuple t : sorted) { appender.addTuple(t); @@ -179,7 +182,10 @@ public class ExternalSortExec extends SortExec { FileUtil.humanReadableByteCount(appender.getOffset(), false) + " bytes, " + rowNum + " rows, " + "sort time: " + (sortEnd - sortStart) + " msec, " + "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)"); - return outputPath; + + FileFragment frag = new FileFragment("", outputPath, 0, + new File(localFS.makeQualified(outputPath).toUri()).length()); + return new Chunk(frag, intermediateMeta); } /** @@ -188,10 +194,10 @@ public class ExternalSortExec extends SortExec { * @return All paths of chunks * @throws java.io.IOException */ - private List<Path> sortAndStoreAllChunks() throws IOException { + private List<Chunk> sortAndStoreAllChunks() throws IOException { Tuple tuple; long memoryConsumption = 0; - List<Path> chunkPaths = TUtil.newList(); + List<Chunk> chunkPaths = TUtil.newList(); int chunkId = 0; long runStartTime = System.currentTimeMillis(); @@ -248,7 +254,8 @@ public class ExternalSortExec extends SortExec { * Get a local path from all temporal paths in round-robin manner. */ private synchronized Path getChunkPathForWrite(int level, int chunkId) throws IOException { - return localDirAllocator.getLocalPathForWrite(sortTmpDir + "/" + level +"_" + chunkId, context.getConf()); + return localFS.makeQualified(localDirAllocator.getLocalPathForWrite( + sortTmpDir + "/" + level + "_" + chunkId, context.getConf())); } @Override @@ -266,7 +273,7 @@ public class ExternalSortExec extends SortExec { } else { // Try to sort all data, and store them as multiple chunks if memory exceeds long startTimeOfChunkSplit = System.currentTimeMillis(); - List<Path> chunks = sortAndStoreAllChunks(); + List<Chunk> chunks = sortAndStoreAllChunks(); long endTimeOfChunkSplit = System.currentTimeMillis(); info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec"); @@ -276,14 +283,7 @@ public class ExternalSortExec extends SortExec { } else { // if input data exceeds main-memory at least once try { - List<FileFragment> fragments = TUtil.newList(); - for (Path chunk : chunks) { - FileFragment frag = new FileFragment("", chunk, 0, - new File(localFS.makeQualified(chunk).toUri()).length()); - fragments.add(frag); - } - - this.result = externalMergeAndSort(fragments); + this.result = externalMergeAndSort(chunks); } catch (Exception e) { throw new PhysicalPlanningException(e); } @@ -324,11 +324,11 @@ public class ExternalSortExec extends SortExec { return computedFanout; } - private Scanner externalMergeAndSort(List<FileFragment> chunks) + private Scanner externalMergeAndSort(List<Chunk> chunks) throws IOException, ExecutionException, InterruptedException { int level = 0; - final List<FileFragment> inputFiles = TUtil.newList(chunks); - final List<FileFragment> outputFiles = TUtil.newList(); + final List<Chunk> inputFiles = TUtil.newList(chunks); + final List<Chunk> outputFiles = TUtil.newList(); int remainRun = inputFiles.size(); int chunksSize = chunks.size(); @@ -341,7 +341,7 @@ public class ExternalSortExec extends SortExec { int remainInputRuns = inputFiles.size(); int outChunkId = 0; int outputFileNum = 0; - List<Future<FileFragment>> futures = TUtil.newList(); + List<Future<Chunk>> futures = TUtil.newList(); // the number of files being merged in threads. List<Integer> numberOfMergingFiles = TUtil.newList(); @@ -364,7 +364,7 @@ public class ExternalSortExec extends SortExec { info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns + ") and output files (" + outputFileNum + ") <= " + defaultFanout); - List<FileFragment> switched = TUtil.newList(); + List<Chunk> switched = TUtil.newList(); // switch the remain inputs to the next outputs for (int j = startIdx; j < inputFiles.size(); j++) { switched.add(inputFiles.get(j)); @@ -379,7 +379,7 @@ public class ExternalSortExec extends SortExec { // wait for all sort runners int finishedMerger = 0; int index = 0; - for (Future<FileFragment> future : futures) { + for (Future<Chunk> future : futures) { outputFiles.add(future.get()); // Getting the number of merged files finishedMerger += numberOfMergingFiles.get(index++); @@ -405,12 +405,12 @@ public class ExternalSortExec extends SortExec { * deleted at this point. However, for the ease of future code maintenance, we delete only type-C fragments here */ int numDeletedFiles = 0; - for (FileFragment frag : inputFiles) { - if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { - localFS.delete(frag.getPath(), true); + for (Chunk chunk : inputFiles) { + if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { + localFS.delete(chunk.getFragment().getPath(), true); numDeletedFiles++; - if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: " + frag); + if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: " + chunk.getFragment()); } } info(LOG, numDeletedFiles + " merged intermediate files deleted"); @@ -436,15 +436,15 @@ public class ExternalSortExec extends SortExec { /** * Merge Thread */ - private class KWayMergerCaller implements Callable<FileFragment> { + private class KWayMergerCaller implements Callable<Chunk> { final int level; final int nextRunId; - final List<FileFragment> inputFiles; + final List<Chunk> inputFiles; final int startIdx; final int mergeFanout; final boolean updateInputStats; - public KWayMergerCaller(final int level, final int nextRunId, final List<FileFragment> inputFiles, + public KWayMergerCaller(final int level, final int nextRunId, final List<Chunk> inputFiles, final int startIdx, final int mergeFanout, final boolean updateInputStats) { this.level = level; this.nextRunId = nextRunId; @@ -455,11 +455,12 @@ public class ExternalSortExec extends SortExec { } @Override - public FileFragment call() throws Exception { + public Chunk call() throws Exception { final Path outputPath = getChunkPathForWrite(level + 1, nextRunId); info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName()); long mergeStartTime = System.currentTimeMillis(); - final RawFileAppender output = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); + final RawFileAppender output = + new RawFileAppender(context.getConf(), null, inSchema, intermediateMeta, outputPath); output.init(); final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout); merger.init(); @@ -475,7 +476,7 @@ public class ExternalSortExec extends SortExec { + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)"); File f = new File(localFS.makeQualified(outputPath).toUri()); FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + outputPath.getName(), outputPath, 0, f.length()); - return frag; + return new Chunk(frag, intermediateMeta); } } @@ -489,7 +490,7 @@ public class ExternalSortExec extends SortExec { /** * Create a merged file scanner or k-way merge scanner. */ - private Scanner createFinalMerger(List<FileFragment> inputs) throws IOException { + private Scanner createFinalMerger(List<Chunk> inputs) throws IOException { if (inputs.size() == 1) { this.result = getFileScanner(inputs.get(0)); } else { @@ -498,11 +499,11 @@ public class ExternalSortExec extends SortExec { return result; } - private Scanner getFileScanner(FileFragment frag) throws IOException { - return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag); + private Scanner getFileScanner(Chunk chunk) throws IOException { + return TablespaceManager.getLocalFs().getScanner(chunk.getMeta(), inSchema, chunk.getFragment(), outSchema); } - private Scanner createKWayMerger(List<FileFragment> inputs, final int startChunkId, final int num) throws IOException { + private Scanner createKWayMerger(List<Chunk> inputs, final int startChunkId, final int num) throws IOException { final Scanner [] sources = new Scanner[num]; for (int i = 0; i < num; i++) { sources[i] = getFileScanner(inputs.get(startChunkId + i)); @@ -773,6 +774,8 @@ public class ExternalSortExec extends SortExec { @Override public void close() throws IOException { + super.close(); + if (result != null) { result.close(); try { @@ -784,7 +787,8 @@ public class ExternalSortExec extends SortExec { } if (finalOutputFiles != null) { - for (FileFragment frag : finalOutputFiles) { + for (Chunk chunk : finalOutputFiles) { + FileFragment frag = chunk.getFragment(); File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri()); if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) { localFS.delete(frag.getPath(), true); @@ -804,7 +808,6 @@ public class ExternalSortExec extends SortExec { } plan = null; - super.close(); } @Override @@ -833,4 +836,22 @@ public class ExternalSortExec extends SortExec { return inputStats; } } + + private static class Chunk { + private FileFragment fragment; + private TableMeta meta; + + public Chunk(FileFragment fragment, TableMeta meta) { + this.fragment = fragment; + this.meta = meta; + } + + public FileFragment getFragment() { + return fragment; + } + + public TableMeta getMeta() { + return meta; + } + } }
