TAJO-1738: Improve off-heap RowBlock.
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/34062ea8 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/34062ea8 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/34062ea8 Branch: refs/heads/branch-0.11.0 Commit: 34062ea8580fec0dda4e0db2e6f16d46443426ac Parents: ade0665 Author: Jinho Kim <[email protected]> Authored: Thu Sep 3 21:47:39 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Thu Sep 3 21:47:39 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + tajo-cli/pom.xml | 4 + .../org/apache/tajo/storage/RowStoreUtil.java | 140 ++--- tajo-cluster-tests/pom.xml | 20 + .../org/apache/tajo/storage/BufferPool.java | 142 +++++ .../org/apache/tajo/tuple/BaseTupleBuilder.java | 84 +++ .../org/apache/tajo/tuple/RowBlockReader.java | 35 ++ .../org/apache/tajo/tuple/TupleBuilder.java | 26 + .../tajo/tuple/memory/DirectBufTuple.java | 42 ++ .../tajo/tuple/memory/FixedSizeLimitSpec.java | 32 + .../tajo/tuple/memory/HeapRowBlockReader.java | 70 +++ .../org/apache/tajo/tuple/memory/HeapTuple.java | 300 ++++++++++ .../apache/tajo/tuple/memory/MemoryBlock.java | 155 +++++ .../tajo/tuple/memory/MemoryRowBlock.java | 174 ++++++ .../tuple/memory/OffHeapRowBlockReader.java | 77 +++ .../tajo/tuple/memory/OffHeapRowBlockUtils.java | 141 +++++ .../tuple/memory/OffHeapRowBlockWriter.java | 63 ++ .../tajo/tuple/memory/OffHeapRowWriter.java | 305 ++++++++++ .../tajo/tuple/memory/ResizableLimitSpec.java | 142 +++++ .../tajo/tuple/memory/ResizableMemoryBlock.java | 231 +++++++ .../org/apache/tajo/tuple/memory/RowBlock.java | 48 ++ .../org/apache/tajo/tuple/memory/RowWriter.java | 80 +++ .../apache/tajo/tuple/memory/UnSafeTuple.java | 342 +++++++++++ .../memory/UnSafeTupleBytesComparator.java | 99 +++ .../apache/tajo/tuple/memory/ZeroCopyTuple.java | 48 ++ .../java/org/apache/tajo/util/UnsafeUtil.java | 10 +- .../apache/tajo/tuple/TestBaseTupleBuilder.java | 80 +++ .../apache/tajo/tuple/memory/TestHeapTuple.java | 82 +++ .../tajo/tuple/memory/TestMemoryRowBlock.java | 595 +++++++++++++++++++ .../tajo/tuple/memory/TestResizableSpec.java | 59 ++ tajo-core-tests/pom.xml | 16 + tajo-jdbc/pom.xml | 4 + tajo-storage/tajo-storage-common/pom.xml | 8 + .../org/apache/tajo/storage/BufferPool.java | 125 ---- .../org/apache/tajo/storage/RowStoreUtil.java | 54 -- .../org/apache/tajo/tuple/BaseTupleBuilder.java | 112 ---- .../org/apache/tajo/tuple/RowBlockReader.java | 33 - .../org/apache/tajo/tuple/TupleBuilder.java | 26 - .../tajo/tuple/offheap/DirectBufTuple.java | 41 -- .../tajo/tuple/offheap/FixedSizeLimitSpec.java | 32 - .../apache/tajo/tuple/offheap/HeapTuple.java | 292 --------- .../tajo/tuple/offheap/OffHeapMemory.java | 102 ---- .../tajo/tuple/offheap/OffHeapRowBlock.java | 213 ------- .../tuple/offheap/OffHeapRowBlockReader.java | 63 -- .../tuple/offheap/OffHeapRowBlockUtils.java | 54 -- .../tuple/offheap/OffHeapRowBlockWriter.java | 58 -- .../tajo/tuple/offheap/OffHeapRowWriter.java | 232 -------- .../tajo/tuple/offheap/ResizableLimitSpec.java | 142 ----- .../apache/tajo/tuple/offheap/RowWriter.java | 73 --- .../apache/tajo/tuple/offheap/UnSafeTuple.java | 331 ----------- .../offheap/UnSafeTupleBytesComparator.java | 99 --- .../tajo/tuple/offheap/ZeroCopyTuple.java | 35 -- .../src/main/resources/storage-default.xml | 20 +- .../apache/tajo/tuple/TestBaseTupleBuilder.java | 76 --- .../tajo/tuple/offheap/TestHeapTuple.java | 45 -- .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 577 ------------------ .../tajo/tuple/offheap/TestResizableSpec.java | 59 -- .../src/test/resources/storage-default.xml | 22 +- tajo-storage/tajo-storage-hbase/pom.xml | 10 + .../storage/rawfile/DirectRawFileScanner.java | 75 ++- .../storage/rawfile/DirectRawFileWriter.java | 100 +--- .../tajo/storage/text/ByteBufLineReader.java | 4 +- .../org/apache/tajo/storage/TestStorages.java | 138 ++--- .../tajo/storage/raw/TestDirectRawFile.java | 21 +- .../src/test/resources/storage-default.xml | 33 +- 65 files changed, 3804 insertions(+), 3149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 5f7f9f8..8f9f941 100644 --- a/CHANGES +++ b/CHANGES @@ -32,6 +32,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1738: Improve off-heap RowBlock. (jinho) + TAJO-1810: Remove QueryMasterTask cache immediately, if it stored to persistent storage. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-cli/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-cli/pom.xml b/tajo-cli/pom.xml index c72ea9b..1d39370 100644 --- a/tajo-cli/pom.xml +++ b/tajo-cli/pom.xml @@ -168,6 +168,10 @@ <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 4c7fe0a..bc23af8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -70,7 +70,7 @@ public class RowStoreUtil { } - public Tuple toTuple(byte [] bytes) { + public Tuple toTuple(byte[] bytes) { nullFlags.clear(); ByteBuffer bb = ByteBuffer.wrap(bytes); Tuple tuple = new VTuple(schema.size()); @@ -81,7 +81,7 @@ public class RowStoreUtil { nullFlags.fromByteBuffer(bb); bb.limit(bytes.length); - for (int i =0; i < schema.size(); i++) { + for (int i = 0; i < schema.size(); i++) { if (nullFlags.get(i)) { tuple.put(i, DatumFactory.createNullDatum()); continue; @@ -90,73 +90,75 @@ public class RowStoreUtil { col = schema.getColumn(i); type = col.getDataType(); switch (type.getType()) { - case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break; - case BIT: - byte b = bb.get(); - tuple.put(i, DatumFactory.createBit(b)); - break; - - case CHAR: - byte [] _str = new byte[type.getLength()]; - bb.get(_str); - tuple.put(i, DatumFactory.createChar(_str)); - break; - - case INT2: - short s = bb.getShort(); - tuple.put(i, DatumFactory.createInt2(s)); - break; - - case INT4: - case DATE: - int i_ = bb.getInt(); - tuple.put(i, DatumFactory.createFromInt4(type, i_)); - break; - - case INT8: - case TIME: - case TIMESTAMP: - long l = bb.getLong(); - tuple.put(i, DatumFactory.createFromInt8(type, l)); - break; - - case INTERVAL: - int month = bb.getInt(); - long milliseconds = bb.getLong(); - tuple.put(i, new IntervalDatum(month, milliseconds)); - break; - - case FLOAT4: - float f = bb.getFloat(); - tuple.put(i, DatumFactory.createFloat4(f)); - break; - - case FLOAT8: - double d = bb.getDouble(); - tuple.put(i, DatumFactory.createFloat8(d)); - break; - - case TEXT: - byte [] _string = new byte[bb.getInt()]; - bb.get(_string); - tuple.put(i, DatumFactory.createText(_string)); - break; - - case BLOB: - byte [] _bytes = new byte[bb.getInt()]; - bb.get(_bytes); - tuple.put(i, DatumFactory.createBlob(_bytes)); - break; - - case INET4: - byte [] _ipv4 = new byte[4]; - bb.get(_ipv4); - tuple.put(i, DatumFactory.createInet4(_ipv4)); - break; - - default: - throw new TajoRuntimeException( - new UnsupportedException("data type '" + col.getDataType().getType().name() + "'")); + case BOOLEAN: + tuple.put(i, DatumFactory.createBool(bb.get())); + break; + case BIT: + byte b = bb.get(); + tuple.put(i, DatumFactory.createBit(b)); + break; + + case CHAR: + byte[] _str = new byte[type.getLength()]; + bb.get(_str); + tuple.put(i, DatumFactory.createChar(_str)); + break; + + case INT2: + short s = bb.getShort(); + tuple.put(i, DatumFactory.createInt2(s)); + break; + + case INT4: + case DATE: + int i_ = bb.getInt(); + tuple.put(i, DatumFactory.createFromInt4(type, i_)); + break; + + case INT8: + case TIME: + case TIMESTAMP: + long l = bb.getLong(); + tuple.put(i, DatumFactory.createFromInt8(type, l)); + break; + + case INTERVAL: + int month = bb.getInt(); + long milliseconds = bb.getLong(); + tuple.put(i, new IntervalDatum(month, milliseconds)); + break; + + case FLOAT4: + float f = bb.getFloat(); + tuple.put(i, DatumFactory.createFloat4(f)); + break; + + case FLOAT8: + double d = bb.getDouble(); + tuple.put(i, DatumFactory.createFloat8(d)); + break; + + case TEXT: + byte[] _string = new byte[bb.getInt()]; + bb.get(_string); + tuple.put(i, DatumFactory.createText(_string)); + break; + + case BLOB: + byte[] _bytes = new byte[bb.getInt()]; + bb.get(_bytes); + tuple.put(i, DatumFactory.createBlob(_bytes)); + break; + + case INET4: + byte[] _ipv4 = new byte[4]; + bb.get(_ipv4); + tuple.put(i, DatumFactory.createInet4(_ipv4)); + break; + + default: + throw new TajoRuntimeException( + new UnsupportedException("data type '" + col.getDataType().getType().name() + "'")); } } return tuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-cluster-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/pom.xml b/tajo-cluster-tests/pom.xml index 9095c36..5514803 100644 --- a/tajo-cluster-tests/pom.xml +++ b/tajo-cluster-tests/pom.xml @@ -213,6 +213,10 @@ <groupId>com.sun.jersey.jersey-test-framework</groupId> <artifactId>jersey-test-framework-grizzly2</artifactId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> <dependency> @@ -240,6 +244,10 @@ <groupId>com.sun.jersey.jersey-test-framework</groupId> <artifactId>jersey-test-framework-grizzly2</artifactId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> <dependency> @@ -264,12 +272,24 @@ <version>${hbase.version}</version> <type>test-jar</type> <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java new file mode 100644 index 0000000..3120083 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.*; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.internal.PlatformDependent; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.CommonTestingUtil; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/* this class is PooledBuffer holder */ +public class BufferPool { + + public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache"; + private static final ByteBufAllocator ALLOCATOR; + + private BufferPool() { + } + + static { + /* TODO Enable thread cache + * Create a pooled ByteBuf allocator but disables the thread-local cache. + * Because the TaskRunner thread is newly created + * */ + + if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + /* Disable pooling buffers for memory usage */ + ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; + + /* if you are finding memory leak, please enable this line */ + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } else { + TajoConf tajoConf = new TajoConf(); + ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0); + } + } + + /** + * borrowed from Spark + */ + public static PooledByteBufAllocator createPooledByteBufAllocator( + boolean allowDirectBufs, + boolean allowCache, + int numCores) { + if (numCores == 0) { + numCores = Runtime.getRuntime().availableProcessors(); + } + return new PooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred(), + Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), + Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), + getPrivateStaticField("DEFAULT_PAGE_SIZE"), + getPrivateStaticField("DEFAULT_MAX_ORDER"), + allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0 + ); + } + + /** Used to get defaults from Netty's private static fields. */ + private static int getPrivateStaticField(String name) { + try { + Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.getInt(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static long maxDirectMemory() { + return PlatformDependent.maxDirectMemory(); + } + + + public static ByteBuf directBuffer(int size) { + return ALLOCATOR.directBuffer(size); + } + + /** + * + * @param size the initial capacity + * @param max the max capacity + * @return allocated ByteBuf from pool + */ + public static ByteBuf directBuffer(int size, int max) { + return ALLOCATOR.directBuffer(size, max).order(ByteOrder.LITTLE_ENDIAN); + } + + /** + * + * @param size the initial capacity + * @param max the max capacity + * @return heap ByteBuf + */ + public static ByteBuf heapBuffer(int size, int max) { + return Unpooled.buffer(size, max).order(ByteOrder.LITTLE_ENDIAN); + } + + @InterfaceStability.Unstable + public static void forceRelease(ByteBuf buf) { + buf.release(buf.refCnt()); + } + + /** + * the ByteBuf will increase to writable size + * @param buf + * @param minWritableBytes required minimum writable size + */ + public static ByteBuf ensureWritable(ByteBuf buf, int minWritableBytes) { + return buf.ensureWritable(minWritableBytes).order(ByteOrder.LITTLE_ENDIAN); + } + + /** + * deallocate the specified direct + * @param byteBuffer + */ + public static void free(ByteBuffer byteBuffer) { + PlatformDependent.freeDirectBuffer(byteBuffer); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java new file mode 100644 index 0000000..04a0267 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -0,0 +1,84 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.*; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.Deallocatable; + +public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { + + private MemoryBlock memoryBlock; + + public BaseTupleBuilder(DataType[] schema) { + super(schema); + this.memoryBlock = new ResizableMemoryBlock(new ResizableLimitSpec(64 * StorageUnit.KB), true); + } + + @Override + public long address() { + return memoryBlock.address(); + } + + public void ensureSize(int size) { + memoryBlock.ensureSize(size); + } + + @Override + public int position() { + return memoryBlock.writerPosition(); + } + + @Override + public void forward(int length) { + memoryBlock.writerPosition(memoryBlock.writerPosition() + length); + } + + @Override + public boolean startRow() { + memoryBlock.writerPosition(0); + return super.startRow(); + } + + @Override + public void endRow() { + super.endRow(); + } + + @Override + public Tuple build() { + return buildToHeapTuple(); + } + + public HeapTuple buildToHeapTuple() { + return buildToZeroCopyTuple().toHeapTuple(); + } + + public UnSafeTuple buildToZeroCopyTuple() { + UnSafeTuple zcTuple = new UnSafeTuple(); + zcTuple.set(memoryBlock, memoryBlock.readerPosition(), memoryBlock.readableBytes(), dataTypes()); + return zcTuple; + } + + public void release() { + memoryBlock.release(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java new file mode 100644 index 0000000..59cdef5 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; + +public interface RowBlockReader<T extends Tuple> { + + /** + * Return for each tuple + * + * @return True if tuple block is filled with tuples. Otherwise, It will return false. + */ + boolean next(T tuple); + + void reset(); + + long remainForRead(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java new file mode 100644 index 0000000..5b4bd80 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java @@ -0,0 +1,26 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.RowWriter; + +public interface TupleBuilder extends RowWriter { + public Tuple build(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java new file mode 100644 index 0000000..10e493f --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.util.Deallocatable; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class DirectBufTuple extends UnSafeTuple implements Deallocatable { + private MemoryBlock memoryBlock; + + public DirectBufTuple(int length, DataType[] types) { + ByteBuffer bb = ByteBuffer.allocateDirect(length).order(ByteOrder.LITTLE_ENDIAN); + memoryBlock = new ResizableMemoryBlock(bb); + + set(memoryBlock, 0, length, types); + } + + @Override + public void release() { + memoryBlock.release(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java new file mode 100644 index 0000000..367d90d --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +/** + * Fixed size limit specification + */ +public class FixedSizeLimitSpec extends ResizableLimitSpec { + public FixedSizeLimitSpec(long size) { + super(size, size); + } + + public FixedSizeLimitSpec(long size, float allowedOverflowRatio) { + super(size, size, allowedOverflowRatio); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java new file mode 100644 index 0000000..dd377cf --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java @@ -0,0 +1,70 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.tuple.RowBlockReader; + +public class HeapRowBlockReader implements RowBlockReader<HeapTuple> { + private final DataType[] dataTypes; + private final MemoryBlock memoryBlock; + private final int rows; + + // Read States + private int curRowIdxForRead; + private int curPosForRead; + + public HeapRowBlockReader(MemoryRowBlock rowBlock) { + this(rowBlock.getMemory(), rowBlock.getDataTypes(), rowBlock.rows()); + } + + public HeapRowBlockReader(MemoryBlock memoryBlock, DataType[] dataTypes, int rows) { + this.dataTypes = dataTypes; + this.rows = rows; + this.memoryBlock = memoryBlock.duplicate(); + } + + public long remainForRead() { + return memoryBlock.readableBytes(); + } + + @Override + public boolean next(HeapTuple tuple) { + if (curRowIdxForRead < rows) { + + int recordLen = memoryBlock.getInt(curPosForRead); + tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes); + + curPosForRead += recordLen; + curRowIdxForRead++; + memoryBlock.readerPosition(curPosForRead); + + return true; + } else { + return false; + } + } + + @Override + public void reset() { + curPosForRead = 0; + curRowIdxForRead = 0; + memoryBlock.readerPosition(curPosForRead); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java new file mode 100644 index 0000000..5d2fdc9 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java @@ -0,0 +1,300 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.datetime.TimeMeta; + +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class HeapTuple extends ZeroCopyTuple implements Cloneable { + private ByteBuf buffer; + private DataType[] types; + + @Override + public void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types) { + this.buffer = memoryBlock.getBuffer(); + this.types = types; + super.set(relativePos, length); + } + + protected void set(final byte[] bytes, final DataType[] types) { + this.buffer = Unpooled.wrappedBuffer(bytes).order(ByteOrder.LITTLE_ENDIAN); + this.types = types; + super.set(0, bytes.length); + } + + @Override + public int size() { + return types.length; + } + + @Override + public TajoDataTypes.Type type(int fieldId) { + return types[fieldId].getType(); + } + + @Override + public int size(int fieldId) { + return buffer.getInt(checkNullAndGetOffset(fieldId)); + } + + @Override + public void clearOffset() { + } + + private int getFieldOffset(int fieldId) { + return buffer.getInt(getRelativePos() + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + private int checkNullAndGetOffset(int fieldId) { + int offset = getFieldOffset(fieldId); + if (offset == MemoryRowBlock.NULL_FIELD_OFFSET) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return offset + getRelativePos(); + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > MemoryRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isBlank(int fieldid) { + return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isBlankOrNull(int fieldid) { + return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void put(Datum[] values) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public Datum asDatum(int fieldId) { + if (isBlankOrNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case BIT: + return DatumFactory.createBit(getByte(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt8(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case CHAR: + return DatumFactory.createChar(getBytes(fieldId)); + case TEXT: + return DatumFactory.createText(getBytes(fieldId)); + case BLOB : + return DatumFactory.createBlob(getBytes(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + case NULL_TYPE: + return NullDatum.get(); + default: + throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'")); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return buffer.getByte(checkNullAndGetOffset(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return buffer.getByte(checkNullAndGetOffset(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return buffer.getChar(checkNullAndGetOffset(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + int pos = checkNullAndGetOffset(fieldId); + int len = buffer.getInt(pos); + + byte [] bytes = new byte[len]; + buffer.getBytes(pos + SizeOf.SIZE_OF_INT, bytes); + return bytes; + } + + @Override + public byte[] getTextBytes(int fieldId) { + return asDatum(fieldId).asTextBytes(); + } + + @Override + public short getInt2(int fieldId) { + return buffer.getShort(checkNullAndGetOffset(fieldId)); + } + + @Override + public int getInt4(int fieldId) { + return buffer.getInt(checkNullAndGetOffset(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return buffer.getLong(checkNullAndGetOffset(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return buffer.getFloat(checkNullAndGetOffset(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return buffer.getDouble(checkNullAndGetOffset(fieldId)); + } + + @Override + public String getText(int fieldId) { + return new String(getBytes(fieldId), TextDatum.DEFAULT_CHARSET); + } + + @Override + public TimeMeta getTimeDate(int fieldId) { + return asDatum(fieldId).asTimeMeta(); + } + + public IntervalDatum getInterval(int fieldId) { + int pos = checkNullAndGetOffset(fieldId); + int months = buffer.getInt(pos); + long millisecs = buffer.getLong(pos + SizeOf.SIZE_OF_INT); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + int pos = checkNullAndGetOffset(fieldId); + int len = buffer.getInt(pos); + + byte [] bytes = new byte[len]; + buffer.getBytes(pos + SizeOf.SIZE_OF_INT, bytes); + return StringUtils.convertBytesToChars(bytes, TextDatum.DEFAULT_CHARSET); + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = asDatum(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + HeapTuple heapTuple = (HeapTuple) super.clone(); + heapTuple.buffer = buffer.copy(getRelativePos(), getLength()); + heapTuple.relativePos = 0; + return heapTuple; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java new file mode 100644 index 0000000..cc4536d --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.util.Deallocatable; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +/** + * This interface provides a memory spec for off-heap or heap + */ +public interface MemoryBlock extends Deallocatable { + + /** + * Current memory address of this buffer + * + * @return the memory address + */ + long address(); + + /** + * @return if true, the buffer has a reference to the low-level memory address + */ + boolean hasAddress(); + + /** + * @return the number of bytes this buffer can contain. + */ + int capacity(); + + /** + * reset the buffer + */ + void clear(); + + /** + * @return true if this buffer has an remaining bytes. + */ + boolean isReadable(); + + /** + * @return the number of readable bytes in this buffer + */ + int readableBytes(); + + /** + * @return the current reader position of this buffer + */ + int readerPosition(); + + /** + * Sets the reader position of this buffer + */ + void readerPosition(int pos); + + /** + * @return true if this buffer is not full filled + */ + boolean isWritable(); + + /** + * @return the number of writable bytes in this buffer + */ + int writableBytes(); + + /** + * @return the current writer position of this buffer + */ + int writerPosition(); + + /** + * Sets the writer position of this buffer + */ + void writerPosition(int pos); + + /** + * Ensure that this buffer has enough remaining space to add the capacity. + * Creates and copies to a new buffer if necessary + * + * @param size Size to add + */ + void ensureSize(int size); + + /** + * Transfers the content of the channel to this buffer + * @param in the input channel + * @return the actual number of bytes read in channel + */ + int writeBytes(ScatteringByteChannel in) throws IOException; + + /** + * Transfers the content of this buffer to the byte array + * @param dst the destination byte array + * @param dstIndex the first index of the destination + * @param length the number of bytes to transfer + * @return the actual number of bytes transfers to the destination byte array + */ + int getBytes(byte[] dst, int dstIndex, int length) throws IOException; + + /** + * This method does not modify {@code readerPosition} or {@code writerPosition} of this buffer. + * @return a 32-bit integer in this buffer + */ + int getInt(int index); + + /** + * Transfers the content of this buffer to the channel + * @param out the output channel + * @param length the maximum number of bytes to transfer + * @return the actual number of bytes transfers to the channel + */ + int writeTo(GatheringByteChannel out, int length) throws IOException; + + int writeTo(GatheringByteChannel out) throws IOException; + + /** + * Transfers the content of this buffer to the stream + * @param out the output stream + * @param length the maximum number of bytes to transfer + * @return the actual number of bytes transfers to the stream + */ + int writeTo(OutputStream out, int length) throws IOException; + + int writeTo(OutputStream out) throws IOException; + + /** + * @return a MemoryBlock which shares the whole region of this. + */ + MemoryBlock duplicate(); + + /** + * @return a internal buffer + */ + ByteBuf getBuffer(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java new file mode 100644 index 0000000..922fc68 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java @@ -0,0 +1,174 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import io.netty.util.internal.PlatformDependent; +import org.apache.tajo.exception.NotImplementedException; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.nio.channels.ScatteringByteChannel; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class MemoryRowBlock implements RowBlock, Deallocatable { + public static final int NULL_FIELD_OFFSET = -1; + + private DataType[] dataTypes; + + // Basic States + private int maxRowNum = Integer.MAX_VALUE; // optional + private int rowNum; + + private RowWriter builder; + private MemoryBlock memory; + + public MemoryRowBlock(DataType[] dataTypes, ResizableLimitSpec limitSpec, boolean isDirect) { + this.memory = new ResizableMemoryBlock(limitSpec, isDirect); + this.dataTypes = dataTypes; + } + + public MemoryRowBlock(MemoryRowBlock rowBlock) { + this.memory = TUtil.checkTypeAndGet(rowBlock.getMemory().duplicate(), ResizableMemoryBlock.class); + this.rowNum = rowBlock.rowNum; + this.dataTypes = rowBlock.dataTypes; + } + + public MemoryRowBlock(MemoryBlock memory, DataType[] dataTypes, int rowNum) { + this.memory = memory; + this.rowNum = rowNum; + this.dataTypes = dataTypes; + } + + public MemoryRowBlock(DataType[] dataTypes) { + this(dataTypes, new ResizableLimitSpec(64 * StorageUnit.KB), true); + } + + public MemoryRowBlock(DataType[] dataTypes, int bytes) { + this(dataTypes, new ResizableLimitSpec(bytes), true); + } + + public MemoryRowBlock(DataType[] dataTypes, int bytes, boolean isDirect) { + this(dataTypes, new ResizableLimitSpec(bytes), isDirect); + } + + @Override + public void clear() { + reset(); + memory.clear(); + } + + private void reset() { + rowNum = 0; + if (builder != null) { + builder.clear(); + } + } + + @Override + public int capacity() { + return memory.capacity(); + } + + public int maxRowNum() { + return maxRowNum; + } + + @Override + public int rows() { + return rowNum; + } + + @Override + public void setRows(int rowNum) { + this.rowNum = rowNum; + } + + @Override + public DataType[] getDataTypes() { + return dataTypes; + } + + @Override + public boolean copyFromChannel(ScatteringByteChannel channel) throws IOException { + reset(); + + int readBytes = memory.writeBytes(channel); + + if (readBytes > 0) { + // get row capacity in buffer + while (memory.isReadable()) { + if (memory.readableBytes() < SizeOf.SIZE_OF_INT) { + return true; + } + + int recordSize = PlatformDependent.getInt(memory.address() + memory.readerPosition()); + assert recordSize > 0; + if (memory.readableBytes() < recordSize) { + return true; + } else { + memory.readerPosition(memory.readerPosition() + recordSize); + } + + rowNum++; + } + + return true; + } else { + return false; + } + } + + @Override + public RowWriter getWriter() { + + if (builder == null) { + if (!getMemory().hasAddress()) { + throw new TajoInternalError(new NotImplementedException("Heap memory writer not implemented yet")); + } else { + this.builder = new OffHeapRowBlockWriter(this); + } + } + return builder; + } + + @Override + public MemoryBlock getMemory() { + return memory; + } + + @Override + public void release() { + memory.release(); + } + + @Override + public RowBlockReader getReader() { + if (!getMemory().hasAddress()) { + return new HeapRowBlockReader(this); + } else { + return new OffHeapRowBlockReader(this); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java new file mode 100644 index 0000000..ccaeffc --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java @@ -0,0 +1,77 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import io.netty.util.internal.PlatformDependent; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.tuple.RowBlockReader; + +public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { + private final DataType[] dataTypes; + private final MemoryBlock memoryBlock; + private final int rows; + + // Read States + private int curRowIdxForRead; + private int curPosForRead; + + public OffHeapRowBlockReader(MemoryRowBlock rowBlock) { + this(rowBlock.getMemory(), rowBlock.getDataTypes(), rowBlock.rows()); + } + + public OffHeapRowBlockReader(MemoryBlock memoryBlock, DataType[] dataTypes, int rows) { + this.memoryBlock = memoryBlock; + this.dataTypes = dataTypes; + this.rows = rows; + if (!memoryBlock.hasAddress()) { + throw new TajoInternalError(memoryBlock.getClass().getSimpleName() + + " does not support to direct memory access"); + } + } + + public long remainForRead() { + return memoryBlock.readableBytes(); + } + + @Override + public boolean next(ZeroCopyTuple tuple) { + if (curRowIdxForRead < rows) { + + long recordStartPtr = memoryBlock.address() + curPosForRead; + int recordLen = PlatformDependent.getInt(recordStartPtr); + tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes); + + curPosForRead += recordLen; + curRowIdxForRead++; + memoryBlock.readerPosition(curPosForRead); + + return true; + } else { + return false; + } + } + + @Override + public void reset() { + curPosForRead = 0; + curRowIdxForRead = 0; + memoryBlock.readerPosition(curPosForRead); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java new file mode 100644 index 0000000..e8f219c --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.common.collect.Lists; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.RowBlockReader; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OffHeapRowBlockUtils { + + public static List<Tuple> sort(MemoryRowBlock rowBlock, Comparator<Tuple> comparator) { + List<Tuple> tupleList = Lists.newArrayList(); + + ZeroCopyTuple zcTuple; + if(rowBlock.getMemory().hasAddress()) { + zcTuple = new UnSafeTuple(); + } else { + zcTuple = new HeapTuple(); + } + + RowBlockReader reader = rowBlock.getReader(); + while(reader.next(zcTuple)) { + tupleList.add(zcTuple); + + if(rowBlock.getMemory().hasAddress()) { + zcTuple = new UnSafeTuple(); + } else { + zcTuple = new HeapTuple(); + } + } + Collections.sort(tupleList, comparator); + return tupleList; + } + + public static Tuple[] sortToArray(MemoryRowBlock rowBlock, Comparator<Tuple> comparator) { + Tuple[] tuples = new Tuple[rowBlock.rows()]; + + ZeroCopyTuple zcTuple; + if(rowBlock.getMemory().hasAddress()) { + zcTuple = new UnSafeTuple(); + } else { + zcTuple = new HeapTuple(); + } + + RowBlockReader reader = rowBlock.getReader(); + for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) { + tuples[i] = zcTuple; + if(rowBlock.getMemory().hasAddress()) { + zcTuple = new UnSafeTuple(); + } else { + zcTuple = new HeapTuple(); + } + } + Arrays.sort(tuples, comparator); + return tuples; + } + + public static void convert(Tuple tuple, RowWriter writer) { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + if (tuple.isBlankOrNull(i)) { + writer.skipField(); + continue; + } + switch (writer.dataTypes()[i].getType()) { + case BOOLEAN: + writer.putBool(tuple.getBool(i)); + break; + case BIT: + writer.putByte(tuple.getByte(i)); + break; + case INT1: + case INT2: + writer.putInt2(tuple.getInt2(i)); + break; + case INT4: + case DATE: + case INET4: + writer.putInt4(tuple.getInt4(i)); + break; + case INT8: + case TIMESTAMP: + case TIME: + writer.putInt8(tuple.getInt8(i)); + break; + case FLOAT4: + writer.putFloat4(tuple.getFloat4(i)); + break; + case FLOAT8: + writer.putFloat8(tuple.getFloat8(i)); + break; + case CHAR: + case TEXT: + writer.putText(tuple.getBytes(i)); + break; + case BLOB: + writer.putBlob(tuple.getBytes(i)); + break; + case INTERVAL: + writer.putInterval((IntervalDatum) tuple.getInterval(i)); + break; + case PROTOBUF: + writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); + break; + case NULL_TYPE: + writer.skipField(); + break; + default: + throw new TajoRuntimeException( + new UnsupportedException("unknown data type '" + writer.dataTypes()[i].getType().name() + "'")); + } + } + writer.endRow(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java new file mode 100644 index 0000000..6832730 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.exception.TajoInternalError; + +public class OffHeapRowBlockWriter extends OffHeapRowWriter { + private RowBlock rowBlock; + + OffHeapRowBlockWriter(RowBlock rowBlock) { + super(rowBlock.getDataTypes()); + this.rowBlock = rowBlock; + if (!rowBlock.getMemory().hasAddress()) { + throw new TajoInternalError(rowBlock.getMemory().getClass().getSimpleName() + + " does not support to direct memory access"); + } + } + + public long address() { + return rowBlock.getMemory().address(); + } + + public int position() { + return rowBlock.getMemory().writerPosition(); + } + + @Override + public void forward(int length) { + rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() + length); + } + + public void ensureSize(int size) { + rowBlock.getMemory().ensureSize(size); + } + + @Override + public void endRow() { + super.endRow(); + rowBlock.setRows(rowBlock.rows() + 1); + } + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return rowBlock.getDataTypes(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java new file mode 100644 index 0000000..c9b233f --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import io.netty.util.internal.PlatformDependent; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.TUtil; + +/** + * + * Row Record Structure + * + * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N | + * 4 bytes 4 bytes 4 bytes + * + */ +public abstract class OffHeapRowWriter implements RowWriter { + /** record capacity + offset list */ + private final int headerSize; + + private final DataType[] dataTypes; + + private int curFieldIdx; + private int curFieldOffset; + private int curOffset; + + public OffHeapRowWriter(final DataType[] dataTypes) { + this.dataTypes = dataTypes; + this.headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1); + this.curFieldOffset = SizeOf.SIZE_OF_INT; + } + + /** + * Current memory address of the row + * + * @return The memory address + */ + public long recordStartAddr() { + return currentAddr() - curOffset; + } + + /** + * Memory address that point to the first byte of the buffer + * + * @return The memory address + */ + private long currentAddr() { + return address() + position(); + } + + /** + * Current memory address of the buffer + * + * @return The memory address + */ + public abstract long address(); + + public abstract void ensureSize(int size); + + public int offset() { + return position(); + } + + /** + * Current position + * + * @return The position + */ + public abstract int position(); + + /** + * Forward the address; + * + * @param length Length to be forwarded + */ + public abstract void forward(int length); + + + @Override + public void clear() { + curOffset = 0; + curFieldIdx = 0; + curFieldOffset = SizeOf.SIZE_OF_INT; + } + + @Override + public DataType[] dataTypes() { + return dataTypes; + } + + @Override + public boolean startRow() { + ensureSize(headerSize); + curOffset = headerSize; + curFieldOffset = SizeOf.SIZE_OF_INT; + curFieldIdx = 0; + forward(headerSize); + return true; + } + + @Override + public void endRow() { + long rowHeaderPos = recordStartAddr(); + // curOffset is equivalent to a byte length of this row. + PlatformDependent.putInt(rowHeaderPos, curOffset); + + //forward (record offset + fields offset) + rowHeaderPos += SizeOf.SIZE_OF_INT + curFieldOffset; + // set remain header field length + for (int i = curFieldIdx; i < dataTypes.length; i++) { + PlatformDependent.putInt(rowHeaderPos, MemoryRowBlock.NULL_FIELD_OFFSET); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + } + + @Override + public void skipField() { + // set header field length + putFieldHeader(currentAddr(), MemoryRowBlock.NULL_FIELD_OFFSET); + } + + /** + * set current buffer position and forward field length + * @param fieldLength + */ + private void forwardField(int fieldLength) { + forward(fieldLength); + curOffset += fieldLength; + + } + + private void putFieldHeader(long currentAddr, int length) { + long currentHeaderAddr = currentAddr - curOffset + curFieldOffset; + + // set header field length + PlatformDependent.putInt(currentHeaderAddr, length); + curFieldOffset += SizeOf.SIZE_OF_INT; + curFieldIdx++; + } + + @Override + public void putByte(byte val) { + ensureSize(SizeOf.SIZE_OF_BYTE); + long addr = currentAddr(); + + PlatformDependent.putByte(addr, val); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_BYTE); + } + + @Override + public void putBool(boolean val) { + ensureSize(SizeOf.SIZE_OF_BOOL); + long addr = currentAddr(); + + PlatformDependent.putByte(addr, (byte) (val ? 0x01 : 0x00)); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_BOOL); + } + + @Override + public void putInt2(short val) { + ensureSize(SizeOf.SIZE_OF_SHORT); + long addr = currentAddr(); + + PlatformDependent.putShort(addr, val); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_SHORT); + } + + @Override + public void putInt4(int val) { + ensureSize(SizeOf.SIZE_OF_INT); + long addr = currentAddr(); + + PlatformDependent.putInt(addr, val); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_INT); + } + + @Override + public void putInt8(long val) { + ensureSize(SizeOf.SIZE_OF_LONG); + long addr = currentAddr(); + + PlatformDependent.putLong(addr, val); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_LONG); + } + + @Override + public void putFloat4(float val) { + ensureSize(SizeOf.SIZE_OF_INT); + long addr = currentAddr(); + + PlatformDependent.putInt(addr, Float.floatToRawIntBits(val)); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_INT); + } + + @Override + public void putFloat8(double val) { + ensureSize(SizeOf.SIZE_OF_LONG); + long addr = currentAddr(); + + PlatformDependent.putLong(addr, Double.doubleToRawLongBits(val)); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_LONG); + } + + @Override + public void putText(String val) { + byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET); + putText(bytes); + } + + @Override + public void putText(byte[] val) { + putBlob(val); + } + + @Override + public void putBlob(byte[] val) { + int bytesLen = val.length; + int fieldLen = SizeOf.SIZE_OF_INT + bytesLen; + + ensureSize(fieldLen); + long addr = currentAddr(); + + PlatformDependent.putInt(addr, bytesLen); + PlatformDependent.copyMemory(val, 0, addr + SizeOf.SIZE_OF_INT, bytesLen); + putFieldHeader(addr, curOffset); + forwardField(fieldLen); + } + + @Override + public void putTimestamp(long val) { + putInt8(val); + } + + @Override + public void putDate(int val) { + putInt4(val); + } + + @Override + public void putTime(long val) { + putInt8(val); + } + + @Override + public void putInterval(IntervalDatum val) { + ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); + long addr = currentAddr(); + + PlatformDependent.putInt(addr, val.getMonths()); + PlatformDependent.putLong(addr + SizeOf.SIZE_OF_INT, val.getMilliSeconds()); + putFieldHeader(addr, curOffset); + forwardField(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); + } + + @Override + public void putInet4(int val) { + putInt4(val); + } + + @Override + public void putProtoDatum(ProtobufDatum val) { + putBlob(val.asByteArray()); + } + + @Override + public void addTuple(Tuple tuple) { + if (tuple instanceof UnSafeTuple) { + UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); + int length = unSafeTuple.getLength(); + ensureSize(length); + PlatformDependent.copyMemory(unSafeTuple.address(), address() + position(), length); + forward(length); + } else { + OffHeapRowBlockUtils.convert(tuple, this); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java new file mode 100644 index 0000000..614b3fb --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.FileUtil; + +/** + * It specifies the maximum size or increasing ratio. In addition, + * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31 + * due to ByteBuffer. + */ +public class ResizableLimitSpec { + private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); + + public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; + public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); + + private final long initSize; + private final long limitBytes; + private final float incRatio; + private final float allowedOVerflowRatio; + private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f; + private final static float DEFAULT_INCREASE_RATIO = 1.0f; + + public ResizableLimitSpec(long initSize) { + this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes) { + this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) { + this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) { + Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes."); + Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB."); + Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes."); + Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB."); + Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0."); + + if (initSize == limitBytes) { + long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio)); + + if (overflowedSize > Integer.MAX_VALUE) { + overflowedSize = Integer.MAX_VALUE; + } + + this.initSize = overflowedSize; + this.limitBytes = overflowedSize; + } else { + this.initSize = initSize; + limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio)); + + if (limitBytes > Integer.MAX_VALUE) { + this.limitBytes = Integer.MAX_VALUE; + } else { + this.limitBytes = limitBytes; + } + } + + this.allowedOVerflowRatio = allowedOverflowRatio; + this.incRatio = incRatio; + } + + public long initialSize() { + return initSize; + } + + public long limit() { + return limitBytes; + } + + public float remainRatio(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + if (currentSize > Integer.MAX_VALUE) { + currentSize = Integer.MAX_VALUE; + } + return (float)currentSize / (float)limitBytes; + } + + public boolean canIncrease(long currentSize) { + return remain(currentSize) > 0; + } + + public long remain(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; + } + + public int increasedSize(int currentSize) { + if (currentSize < initSize) { + return (int) initSize; + } + + if (currentSize > Integer.MAX_VALUE) { + LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); + return Integer.MAX_VALUE; + } + long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); + + if (nextSize > limitBytes) { + LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")"); + nextSize = limitBytes; + } + + if (nextSize > Integer.MAX_VALUE) { + LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")"); + nextSize = Integer.MAX_VALUE; + } + + return (int) nextSize; + } + + @Override + public String toString() { + return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit=" + + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio + + ",inc_ratio=" + incRatio; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java new file mode 100644 index 0000000..22c2561 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +public class ResizableMemoryBlock implements MemoryBlock { + private static final Log LOG = LogFactory.getLog(ResizableMemoryBlock.class); + + protected ByteBuf buffer; + protected ResizableLimitSpec limitSpec; + + public ResizableMemoryBlock(ByteBuf buffer, ResizableLimitSpec limitSpec) { + this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN); + this.limitSpec = limitSpec; + } + + public ResizableMemoryBlock(ByteBuf buffer) { + this(buffer, new ResizableLimitSpec(buffer.capacity())); + } + + public ResizableMemoryBlock(ByteBuffer buffer) { + this.buffer = Unpooled.wrappedBuffer(buffer).order(ByteOrder.LITTLE_ENDIAN); + this.limitSpec = new ResizableLimitSpec(buffer.capacity()); + } + + public ResizableMemoryBlock(ResizableLimitSpec limitSpec, boolean isDirect) { + if (isDirect) { + this.buffer = BufferPool.directBuffer((int) limitSpec.initialSize(), (int) limitSpec.limit()); + } else { + this.buffer = BufferPool.heapBuffer((int) limitSpec.initialSize(), (int) limitSpec.limit()); + } + this.limitSpec = limitSpec; + } + + @Override + public long address() { + return buffer.memoryAddress(); + } + + @Override + public boolean hasAddress() { + return buffer.hasMemoryAddress(); + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public void clear() { + buffer.clear(); + } + + @Override + public boolean isReadable() { + return buffer.isReadable(); + } + + @Override + public int readableBytes() { + return buffer.readableBytes(); + } + + @Override + public int readerPosition() { + return buffer.readerIndex(); + } + + @Override + public void readerPosition(int pos) { + buffer.readerIndex(pos); + } + + @Override + public boolean isWritable() { + return buffer.isWritable(); + } + + @Override + public int writableBytes() { + return buffer.writableBytes(); + } + + @Override + public void writerPosition(int pos) { + buffer.writerIndex(pos); + } + + @Override + public int writerPosition() { + return buffer.writerIndex(); + } + + + @Override + public void ensureSize(int size) { + if (!buffer.isWritable(size)) { + if (!limitSpec.canIncrease(buffer.capacity())) { + throw new RuntimeException("Cannot increase RowBlock anymore."); + } + + int newBlockSize = limitSpec.increasedSize(buffer.capacity()); + resize(newBlockSize); + LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + } + } + + private void resize(int newSize) { + Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); + + if (newSize > limitSpec.limit()) { + throw new RuntimeException("Resize cannot exceed the capacity limit"); + } + + if (newSize < buffer.capacity()) { + LOG.warn("The capacity reduction is ignored."); + } + + int newBlockSize = UnsafeUtil.alignedSize(newSize); + buffer = BufferPool.ensureWritable(buffer, newBlockSize); + } + + @Override + public void release() { + buffer.release(); + } + + @Override + public MemoryBlock duplicate() { + return new ResizableMemoryBlock(buffer.duplicate().readerIndex(0), limitSpec); + } + + @Override + public ByteBuf getBuffer() { + return buffer; + } + + @Override + public int writeBytes(ScatteringByteChannel channel) throws IOException { + + if (buffer.readableBytes() > 0) { + this.buffer.markReaderIndex(); + this.buffer.discardReadBytes(); // compact the buffer + } else { + buffer.clear(); + } + + int readBytes = 0; + while (buffer.writableBytes() > 0) { + int localReadBytes = buffer.writeBytes(channel, buffer.writableBytes()); + if (localReadBytes < 0) { + break; + } + readBytes += localReadBytes; + } + + return readBytes; + } + + @Override + public int getBytes(byte[] bytes, int dstIndex, int length) throws IOException { + int readableBytes = buffer.readableBytes(); + buffer.readBytes(bytes, dstIndex, length); + return readableBytes - buffer.readableBytes(); + } + + @Override + public int getInt(int index) { + return buffer.getInt(index); + } + + @Override + public int writeTo(GatheringByteChannel channel, int length) throws IOException { + return buffer.readBytes(channel, length); + } + + @Override + public int writeTo(GatheringByteChannel channel) throws IOException { + return buffer.readBytes(channel, buffer.readableBytes()); + } + + @Override + public int writeTo(OutputStream outputStream, int length) throws IOException { + buffer.readBytes(outputStream, length); + return length; + } + + @Override + public int writeTo(OutputStream outputStream) throws IOException { + int readableBytes = buffer.readableBytes(); + buffer.readBytes(outputStream, readableBytes); + return readableBytes - buffer.readableBytes(); + } + + @Override + public String toString() { + return "memory=" + FileUtil.humanReadableByteCount(capacity(), false) + "," + limitSpec; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java new file mode 100644 index 0000000..68902fb --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.tuple.RowBlockReader; + +import java.io.IOException; +import java.nio.channels.ScatteringByteChannel; + +public interface RowBlock { + + void clear(); + + int capacity(); + + void setRows(int rowNum); + + int rows(); + + TajoDataTypes.DataType[] getDataTypes(); + + RowBlockReader getReader(); + + RowWriter getWriter(); + + MemoryBlock getMemory(); + + void release(); + + boolean copyFromChannel(ScatteringByteChannel channel) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java new file mode 100644 index 0000000..0393714 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java @@ -0,0 +1,80 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.storage.Tuple; + +/** + * The call sequence should be as follows: + * + * <pre> + * startRow() --> skipField() or putXXX --> endRow() + * </pre> + * + * The total number of skipField and putXXX invocations must be equivalent to the number of fields. + */ +public interface RowWriter { + + TajoDataTypes.DataType [] dataTypes(); + + boolean startRow(); + + void endRow(); + + void skipField(); + + void clear(); + + void putByte(byte val); + + void putBool(boolean val); + + void putInt2(short val); + + void putInt4(int val); + + void putInt8(long val); + + void putFloat4(float val); + + void putFloat8(double val); + + void putText(String val); + + void putText(byte[] val); + + void putBlob(byte[] val); + + void putTimestamp(long val); + + void putTime(long val); + + void putDate(int val); + + void putInterval(IntervalDatum val); + + void putInet4(int val); + + void putProtoDatum(ProtobufDatum datum); + + void addTuple(Tuple tuple); +}
