http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java new file mode 100644 index 0000000..3dc8c23 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java @@ -0,0 +1,342 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.netty.util.internal.PlatformDependent; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.UnsafeUtil; +import org.apache.tajo.util.datetime.TimeMeta; +import sun.misc.Unsafe; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class UnSafeTuple extends ZeroCopyTuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + private long address; + private DataType[] types; + + @Override + public void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types) { + Preconditions.checkArgument(memoryBlock.hasAddress()); + + this.address = memoryBlock.address(); + this.types = types; + super.set(relativePos, length); + } + + public void set(UnSafeTuple tuple) { + this.address = tuple.address; + this.types = tuple.types; + super.set(tuple.getRelativePos(), tuple.getLength()); + } + + @Override + public int size() { + return types.length; + } + + @Override + public TajoDataTypes.Type type(int fieldId) { + return types[fieldId].getType(); + } + + @Override + public int size(int fieldId) { + return PlatformDependent.getInt(getFieldAddr(fieldId)); + } + + public void writeTo(ByteBuffer bb) { + if (bb.remaining() < getLength()) { + throw new IndexOutOfBoundsException("remaining length: " + bb.remaining() + + ", tuple length: " + getLength()); + } + + if (getLength() > 0) { + if (bb.isDirect()) { + PlatformDependent.copyMemory(address(), PlatformDependent.directBufferAddress(bb) + bb.position(), getLength()); + bb.position(bb.position() + getLength()); + } else { + PlatformDependent.copyMemory(address(), bb.array(), bb.arrayOffset() + bb.position(), getLength()); + bb.position(bb.position() + getLength()); + } + } + } + + public long address() { + return address + getRelativePos(); + } + + public HeapTuple toHeapTuple() { + HeapTuple heapTuple = new HeapTuple(); + byte [] bytes = new byte[getLength()]; + PlatformDependent.copyMemory(address(), bytes, 0, getLength()); + heapTuple.set(bytes, types); + return heapTuple; + } + + private int getFieldOffset(int fieldId) { + return PlatformDependent.getInt(address()+ (long)(SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT))); + } + + public long getFieldAddr(int fieldId) { + int fieldOffset = getFieldOffset(fieldId); + if (fieldOffset == -1) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return address() + fieldOffset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > MemoryRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isBlank(int fieldid) { + return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isBlankOrNull(int fieldid) { + return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void put(Datum[] values) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public Datum asDatum(int fieldId) { + if (isBlankOrNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case BIT: + return DatumFactory.createBit(getByte(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt8(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case CHAR: + return DatumFactory.createChar(getBytes(fieldId)); + case TEXT: + return DatumFactory.createText(getBytes(fieldId)); + case BLOB: + return DatumFactory.createBlob(getBytes(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + case NULL_TYPE: + return NullDatum.get(); + default: + throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'")); + } + } + + @Override + public void clearOffset() { + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return PlatformDependent.getByte(getFieldAddr(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return PlatformDependent.getByte(getFieldAddr(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(getFieldAddr(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = PlatformDependent.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + PlatformDependent.copyMemory(pos, bytes, 0, len); + return bytes; + } + + @Override + public byte[] getTextBytes(int fieldId) { + return asDatum(fieldId).asTextBytes(); + } + + @Override + public short getInt2(int fieldId) { + long addr = getFieldAddr(fieldId); + return PlatformDependent.getShort(addr); + } + + @Override + public int getInt4(int fieldId) { + return PlatformDependent.getInt(getFieldAddr(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return PlatformDependent.getLong(getFieldAddr(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return Float.intBitsToFloat(PlatformDependent.getInt(getFieldAddr(fieldId))); + } + + @Override + public double getFloat8(int fieldId) { + return Double.longBitsToDouble(PlatformDependent.getLong(getFieldAddr(fieldId))); + } + + @Override + public String getText(int fieldId) { + return new String(getBytes(fieldId), TextDatum.DEFAULT_CHARSET); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + long pos = getFieldAddr(fieldId); + int months = PlatformDependent.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = PlatformDependent.getLong(pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId]); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = PlatformDependent.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + PlatformDependent.copyMemory(pos, bytes, 0, len); + return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); + } + + @Override + public TimeMeta getTimeDate(int fieldId) { + return asDatum(fieldId).asTimeMeta(); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return toHeapTuple(); + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = asDatum(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } + + public void release() { + + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java new file mode 100644 index 0000000..53a78a8 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java @@ -0,0 +1,99 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedLongs; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +import java.nio.ByteOrder; + +/** + * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator. + */ +public class UnSafeTupleBytesComparator { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + static final boolean littleEndian = + ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); + + public static int compare(long ptr1, long ptr2) { + int lstrLen = UNSAFE.getInt(ptr1); + int rstrLen = UNSAFE.getInt(ptr2); + + ptr1 += SizeOf.SIZE_OF_INT; + ptr2 += SizeOf.SIZE_OF_INT; + + int minLength = Math.min(lstrLen, rstrLen); + int minWords = minLength / Longs.BYTES; + + /* + * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a + * time is no slower than comparing 4 bytes at a time even on 32-bit. + * On the other hand, it is substantially faster on 64-bit. + */ + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = UNSAFE.getLong(ptr1); + long rw = UNSAFE.getLong(ptr2); + long diff = lw ^ rw; + + if (diff != 0) { + if (!littleEndian) { + return UnsignedLongs.compare(lw, rw); + } + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + if (x == 0) { + x = (int) (diff >>> 32); + n = 32; + } + + y = x << 16; + if (y == 0) { + n += 16; + } else { + x = y; + } + + y = x << 8; + if (y == 0) { + n += 8; + } + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + + ptr1 += SizeOf.SIZE_OF_LONG; + ptr2 += SizeOf.SIZE_OF_LONG; + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++); + if (result != 0) { + return result; + } + } + return lstrLen - rstrLen; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java new file mode 100644 index 0000000..1f4f57e --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.storage.Tuple; + +public abstract class ZeroCopyTuple implements Tuple { + + protected int relativePos; + protected int length; + + public abstract void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types); + + void set(int relativePos, int length) { + this.relativePos = relativePos; + this.length = length; + } + + public int getRelativePos() { + return relativePos; + } + + public int getLength() { + return length; + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return (Tuple) super.clone(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java index ff6072e..575e628 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java @@ -18,7 +18,7 @@ package org.apache.tajo.util; import com.google.common.base.Preconditions; -import sun.misc.Cleaner; +import io.netty.util.internal.PlatformDependent; import sun.misc.Unsafe; import sun.nio.ch.DirectBuffer; @@ -132,12 +132,6 @@ public class UnsafeUtil { } public static void free(ByteBuffer bb) { - Preconditions.checkNotNull(bb); - Preconditions.checkState(bb.isDirect()); - - Cleaner cleaner = ((DirectBuffer) bb).cleaner(); - if (cleaner != null) { - cleaner.clean(); - } + PlatformDependent.freeDirectBuffer(bb); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java new file mode 100644 index 0000000..6ce1a6f --- /dev/null +++ b/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.tuple.memory.*; +import org.junit.Test; + +public class TestBaseTupleBuilder { + + @Test + public void testBuild() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestMemoryRowBlock.schema); + + MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(10248); + RowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new UnSafeTuple(); + + HeapTuple heapTuple; + ZeroCopyTuple zcTuple; + int i = 0; + while(reader.next(inputTuple)) { + OffHeapRowBlockUtils.convert(inputTuple, builder); + + zcTuple = builder.buildToZeroCopyTuple(); + TestMemoryRowBlock.validateTupleResult(i, zcTuple); + + heapTuple = builder.buildToHeapTuple(); + TestMemoryRowBlock.validateTupleResult(i, heapTuple); + + i++; + } + builder.release(); + rowBlock.release(); + } + + @Test + public void testBuildWithNull() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestMemoryRowBlock.schema); + + MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlockWithNull(10248); + RowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new UnSafeTuple(); + + HeapTuple heapTuple; + ZeroCopyTuple zcTuple; + int i = 0; + while(reader.next(inputTuple)) { + OffHeapRowBlockUtils.convert(inputTuple, builder); + + heapTuple = builder.buildToHeapTuple(); + TestMemoryRowBlock.validateNullity(i, heapTuple); + + zcTuple = builder.buildToZeroCopyTuple(); + TestMemoryRowBlock.validateNullity(i, zcTuple); + + i++; + } + + builder.release(); + rowBlock.release(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java new file mode 100644 index 0000000..2b45428 --- /dev/null +++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java @@ -0,0 +1,82 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.tuple.RowBlockReader; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestHeapTuple { + + @Test + public void testHeapTupleFromOffheap() { + MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(1024); + assertTrue(rowBlock.getMemory().getBuffer().isDirect()); + assertTrue(rowBlock.getMemory().hasAddress()); + + RowBlockReader reader = rowBlock.getReader(); + assertEquals(OffHeapRowBlockReader.class, reader.getClass()); + + UnSafeTuple zcTuple = new UnSafeTuple(); + int i = 0; + while (reader.next(zcTuple)) { + + HeapTuple heapTuple = zcTuple.toHeapTuple(); + TestMemoryRowBlock.validateTupleResult(i, heapTuple); + TestMemoryRowBlock.validateTupleResult(i, zcTuple); + TestMemoryRowBlock.validateTupleResult(i, zcTuple.toHeapTuple()); + i++; + } + + assertEquals(rowBlock.rows(), i); + rowBlock.release(); + } + + @Test + public void testHeapTupleFromHeap() throws CloneNotSupportedException { + MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(1024); + int length = rowBlock.getMemory().writerPosition(); + //write rows to heap + ByteBuf heapBuffer = BufferPool.heapBuffer(length, length); + heapBuffer.writeBytes(rowBlock.getMemory().getBuffer()); + assertFalse(heapBuffer.isDirect()); + + ResizableMemoryBlock memoryBlock = + new ResizableMemoryBlock(heapBuffer); + assertFalse(memoryBlock.hasAddress()); + + + RowBlockReader reader = new HeapRowBlockReader(memoryBlock, rowBlock.getDataTypes(), rowBlock.rows()); + assertEquals(HeapRowBlockReader.class, reader.getClass()); + HeapTuple heapTuple = new HeapTuple(); + int i = 0; + while (reader.next(heapTuple)) { + + TestMemoryRowBlock.validateTupleResult(i, heapTuple); + TestMemoryRowBlock.validateTupleResult(i, heapTuple.clone()); + i++; + } + assertEquals(rowBlock.rows(), i); + rowBlock.release(); + memoryBlock.release(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java new file mode 100644 index 0000000..a6003c7 --- /dev/null +++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java @@ -0,0 +1,595 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.NumberUtil; +import org.apache.tajo.util.ProtoUtil; +import org.junit.Test; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static org.apache.tajo.common.TajoDataTypes.Type; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestMemoryRowBlock { + private static final Log LOG = LogFactory.getLog(TestMemoryRowBlock.class); + public static String UNICODE_FIELD_PREFIX = "abc_ê°ëë¤_"; + public static DataType[] schema; + + static { + schema = new DataType[] { + DataType.newBuilder().setType(Type.BOOLEAN).build(), + DataType.newBuilder().setType(Type.INT2).build(), + DataType.newBuilder().setType(Type.INT4).build(), + DataType.newBuilder().setType(Type.INT8).build(), + DataType.newBuilder().setType(Type.FLOAT4).build(), + DataType.newBuilder().setType(Type.FLOAT8).build(), + DataType.newBuilder().setType(Type.TEXT).build(), + DataType.newBuilder().setType(Type.TIMESTAMP).build(), + DataType.newBuilder().setType(Type.DATE).build(), + DataType.newBuilder().setType(Type.TIME).build(), + DataType.newBuilder().setType(Type.INTERVAL).build(), + DataType.newBuilder().setType(Type.INET4).build(), + DataType.newBuilder().setType(Type.PROTOBUF).setCode(PrimitiveProtos.StringProto.class.getName()).build() + }; + } + + private void explainRowBlockAllocation(MemoryRowBlock rowBlock, long startTime, long endTime) { + LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated " + + (endTime - startTime) + " msec"); + } + + @Test + public void testPutAndReadValidation() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + MemoryRowBlock rowBlock = new MemoryRowBlock(schema, 1024); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + RowBlockReader reader = null; + + ZeroCopyTuple tuple = new UnSafeTuple(); + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRow(i, rowBlock.getWriter()); + + reader = rowBlock.getReader(); + int j = 0; + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + j++; + } + } + + assertNotNull(reader); + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); + + long readStart = System.currentTimeMillis(); + tuple = new UnSafeTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + j++; + } + assertEquals(rowNum, j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + @Test + public void testNullityValidation() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + MemoryRowBlock rowBlock = new MemoryRowBlock(schema, 1024); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + RowBlockReader reader = null; + ZeroCopyTuple tuple = new UnSafeTuple(); + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + + fillRowBlockWithNull(i, rowBlock.getWriter()); + + reader = rowBlock.getReader(); + int j = 0; + while(reader.next(tuple)) { + validateNullity(j, tuple); + + j++; + } + } + + assertNotNull(reader); + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and nullity validating take " + (writeEnd - writeStart) + " msec"); + + long readStart = System.currentTimeMillis(); + tuple = new UnSafeTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + validateNullity(j, tuple); + + j++; + } + assertEquals(rowNum, j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + @Test + public void testEmptyRow() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 10); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + rowBlock.getWriter().startRow(); + // empty columns + rowBlock.getWriter().endRow(); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing tooks " + (writeEnd - writeStart) + " msec"); + + RowBlockReader reader = rowBlock.getReader(); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new UnSafeTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + j++; + } + + assertEquals(rowNum, j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + rowBlock.release(); + + assertEquals(rowNum, j); + assertEquals(rowNum, rowBlock.rows()); + } + + @Test + public void testSortBenchmark() { + int rowNum = 1000; + + MemoryRowBlock rowBlock = createRowBlock(rowNum); + List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList(); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new UnSafeTuple(); + + RowBlockReader reader = rowBlock.getReader(); + while(reader.next(tuple)) { + unSafeTuples.add(tuple); + tuple = new UnSafeTuple(); + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + long sortStart = System.currentTimeMillis(); + Collections.sort(unSafeTuples, new Comparator<ZeroCopyTuple>() { + @Override + public int compare(ZeroCopyTuple t1, ZeroCopyTuple t2) { + return NumberUtil.compare(t1.getInt4(2), t2.getInt4(2)); + } + }); + long sortEnd = System.currentTimeMillis(); + LOG.info("sorting took " + (sortEnd - sortStart) + " msec"); + rowBlock.release(); + } + + @Test + public void testVTuplePutAndGetBenchmark() { + int rowNum = 1000; + + List<VTuple> rowBlock = Lists.newArrayList(); + long writeStart = System.currentTimeMillis(); + VTuple tuple; + for (int i = 0; i < rowNum; i++) { + tuple = new VTuple(schema.length); + fillVTuple(i, tuple); + rowBlock.add(tuple); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); + + long readStart = System.currentTimeMillis(); + int j = 0; + for (VTuple t : rowBlock) { + validateTupleResult(j, t); + j++; + } + + assertEquals(rowNum, j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + int count = 0; + for (int l = 0; l < rowBlock.size(); l++) { + for(int m = 0; m < schema.length; m++ ) { + if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) { + count ++; + } + } + } + // For preventing unnecessary code elimination optimization. + LOG.info("The number of INT4 values is " + count + "."); + } + + @Test + public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() { + int rowNum = 1000; + + MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 100); + + long writeStart = System.currentTimeMillis(); + VTuple tuple = new VTuple(schema.length); + for (int i = 0; i < rowNum; i++) { + fillVTuple(i, tuple); + rowBlock.getWriter().addTuple(tuple); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); + + validateResults(rowBlock); + rowBlock.release(); + } + + @Test + public void testSerDerOfRowBlock() { + int rowNum = 1000; + + MemoryRowBlock rowBlock = createRowBlock(rowNum); + + MemoryRowBlock restoredRowBlock = new MemoryRowBlock(rowBlock); + validateResults(restoredRowBlock); + rowBlock.release(); + } + + @Test + public void testSerDerOfZeroCopyTuple() { + int rowNum = 1000; + + MemoryRowBlock rowBlock = createRowBlock(rowNum); + + MemoryRowBlock restoredRowBlock = new MemoryRowBlock(rowBlock); + RowBlockReader reader = restoredRowBlock.getReader(); + + long readStart = System.currentTimeMillis(); + UnSafeTuple tuple = new UnSafeTuple(); + + int j = 0; + List<ZeroCopyTuple> copyTuples = Lists.newArrayList(); + + while (reader.next(tuple)) { + validateTupleResult(j, tuple); + + UnSafeTuple copyTuple = new UnSafeTuple(); + copyTuple.set(tuple); + copyTuples.add(copyTuple); + + j++; + } + + assertEquals(rowNum, j); + + for (int i = 0; i < j; i++) { + validateTupleResult(i, copyTuples.get(i)); + } + + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + public static MemoryRowBlock createRowBlock(int rowNum) { + long allocateStart = System.currentTimeMillis(); + MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 8); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRow(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); + + return rowBlock; + } + + public static MemoryRowBlock createRowBlockWithNull(int rowNum) { + long allocateStart = System.currentTimeMillis(); + MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 8); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRowBlockWithNull(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); + + return rowBlock; + } + + public static void fillRow(int i, RowWriter builder) { + builder.startRow(); + builder.putBool(i % 1 == 0 ? true : false); // 0 + builder.putInt2((short) 1); // 1 + builder.putInt4(i); // 2 + builder.putInt8(i); // 3 + builder.putFloat4(i); // 4 + builder.putFloat8(i); // 5 + builder.putText(UNICODE_FIELD_PREFIX + i); // 6 + builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + builder.endRow(); + } + + public static void fillRowBlockWithNull(int i, RowWriter writer) { + writer.startRow(); + + if (i == 0) { + writer.skipField(); + } else { + writer.putBool(i % 1 == 0 ? true : false); // 0 + } + if (i % 1 == 0) { + writer.skipField(); + } else { + writer.putInt2((short) 1); // 1 + } + + if (i % 2 == 0) { + writer.skipField(); + } else { + writer.putInt4(i); // 2 + } + + if (i % 3 == 0) { + writer.skipField(); + } else { + writer.putInt8(i); // 3 + } + + if (i % 4 == 0) { + writer.skipField(); + } else { + writer.putFloat4(i); // 4 + } + + if (i % 5 == 0) { + writer.skipField(); + } else { + writer.putFloat8(i); // 5 + } + + if (i % 6 == 0) { + writer.skipField(); + } else { + writer.putText(UNICODE_FIELD_PREFIX + i); // 6 + } + + if (i % 7 == 0) { + writer.skipField(); + } else { + writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + } + + if (i % 8 == 0) { + writer.skipField(); + } else { + writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + } + + if (i % 9 == 0) { + writer.skipField(); + } else { + writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + } + + if (i % 10 == 0) { + writer.skipField(); + } else { + writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + } + + if (i % 11 == 0) { + writer.skipField(); + } else { + writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + } + + if (i % 12 == 0) { + writer.skipField(); + } else { + writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + } + + writer.endRow(); + } + + public static void fillVTuple(int i, VTuple tuple) { + tuple.put(0, DatumFactory.createBool(i % 1 == 0)); + tuple.put(1, DatumFactory.createInt2((short) 1)); + tuple.put(2, DatumFactory.createInt4(i)); + tuple.put(3, DatumFactory.createInt8(i)); + tuple.put(4, DatumFactory.createFloat4(i)); + tuple.put(5, DatumFactory.createFloat8(i)); + tuple.put(6, DatumFactory.createText(UNICODE_FIELD_PREFIX + i)); + tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 + tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 + tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 + tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 + tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 + tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; + } + + public static void validateResults(MemoryRowBlock rowBlock) { + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new UnSafeTuple(); + int j = 0; + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + reader.reset(); + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + j++; + } + assertEquals(rowBlock.rows(), j); + long readEnd = System.currentTimeMillis(); + LOG.info("Reading takes " + (readEnd - readStart) + " msec"); + } + + public static void validateTupleResult(int j, Tuple t) { + assertTrue((j % 1 == 0) == t.getBool(0)); + assertTrue(1 == t.getInt2(1)); + assertEquals(j, t.getInt4(2)); + assertEquals(j, t.getInt8(3)); + assertTrue(j == t.getFloat4(4)); + assertTrue(j == t.getFloat8(5)); + assertEquals(UNICODE_FIELD_PREFIX + j, t.getText(6)); + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); + } + + public static void validateNullity(int j, Tuple tuple) { + if (j == 0) { + tuple.isBlankOrNull(0); + } else { + assertTrue((j % 1 == 0) == tuple.getBool(0)); + } + + if (j % 1 == 0) { + tuple.isBlankOrNull(1); + } else { + assertTrue(1 == tuple.getInt2(1)); + } + + if (j % 2 == 0) { + tuple.isBlankOrNull(2); + } else { + assertEquals(j, tuple.getInt4(2)); + } + + if (j % 3 == 0) { + tuple.isBlankOrNull(3); + } else { + assertEquals(j, tuple.getInt8(3)); + } + + if (j % 4 == 0) { + tuple.isBlankOrNull(4); + } else { + assertTrue(j == tuple.getFloat4(4)); + } + + if (j % 5 == 0) { + tuple.isBlankOrNull(5); + } else { + assertTrue(j == tuple.getFloat8(5)); + } + + if (j % 6 == 0) { + tuple.isBlankOrNull(6); + } else { + assertEquals(UNICODE_FIELD_PREFIX + j, tuple.getText(6)); + } + + if (j % 7 == 0) { + tuple.isBlankOrNull(7); + } else { + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); + } + + if (j % 8 == 0) { + tuple.isBlankOrNull(8); + } else { + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); + } + + if (j % 9 == 0) { + tuple.isBlankOrNull(9); + } else { + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); + } + + if (j % 10 == 0) { + tuple.isBlankOrNull(10); + } else { + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); + } + + if (j % 11 == 0) { + tuple.isBlankOrNull(11); + } else { + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); + } + + if (j % 12 == 0) { + tuple.isBlankOrNull(12); + } else { + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java new file mode 100644 index 0000000..483dad5 --- /dev/null +++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import org.apache.tajo.unit.StorageUnit; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestResizableSpec { + + @Test + public void testResizableLimit() { + ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f); + + long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); + + assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); + + assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB)); + + assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB)); + + assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1); + + assertFalse(limit.canIncrease(limit.limit())); + } + + @Test + public void testFixedLimit() { + FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f); + + assertEquals(limit.limit(), 100 * StorageUnit.MB); + + assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000)); + + assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB)); + + assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB)); + + assertFalse(limit.canIncrease(limit.limit())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-core-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml index a23e420..8199f46 100644 --- a/tajo-core-tests/pom.xml +++ b/tajo-core-tests/pom.xml @@ -225,6 +225,10 @@ <groupId>com.sun.jersey.jersey-test-framework</groupId> <artifactId>jersey-test-framework-grizzly2</artifactId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> <dependency> @@ -280,12 +284,24 @@ <version>${hbase.version}</version> <type>test-jar</type> <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml index 27cc471..09fabdd 100644 --- a/tajo-jdbc/pom.xml +++ b/tajo-jdbc/pom.xml @@ -147,6 +147,10 @@ <groupId>com.sun.jersey.jersey-test-framework</groupId> <artifactId>jersey-test-framework-grizzly2</artifactId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml index afbeead..4321a13 100644 --- a/tajo-storage/tajo-storage-common/pom.xml +++ b/tajo-storage/tajo-storage-common/pom.xml @@ -222,6 +222,10 @@ limitations under the License. <groupId>com.sun.jersey.jersey-test-framework</groupId> <artifactId>jersey-test-framework-grizzly2</artifactId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> <dependency> @@ -273,6 +277,10 @@ limitations under the License. <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java deleted file mode 100644 index d611ee3..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.util.ResourceLeakDetector; -import io.netty.util.internal.PlatformDependent; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.util.CommonTestingUtil; - -import java.lang.reflect.Field; - -/* this class is PooledBuffer holder */ -public class BufferPool { - - public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache"; - private static final ByteBufAllocator ALLOCATOR; - - private BufferPool() { - } - - static { - /* TODO Enable thread cache - * Create a pooled ByteBuf allocator but disables the thread-local cache. - * Because the TaskRunner thread is newly created - * */ - - if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { - /* Disable pooling buffers for memory usage */ - ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; - - /* if you are finding memory leak, please enable this line */ - ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); - } else { - TajoConf tajoConf = new TajoConf(); - ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0); - } - } - - /** - * borrowed from Spark - */ - public static PooledByteBufAllocator createPooledByteBufAllocator( - boolean allowDirectBufs, - boolean allowCache, - int numCores) { - if (numCores == 0) { - numCores = Runtime.getRuntime().availableProcessors(); - } - return new PooledByteBufAllocator( - allowDirectBufs && PlatformDependent.directBufferPreferred(), - Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), - Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), - getPrivateStaticField("DEFAULT_PAGE_SIZE"), - getPrivateStaticField("DEFAULT_MAX_ORDER"), - allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0, - allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, - allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0 - ); - } - - /** Used to get defaults from Netty's private static fields. */ - private static int getPrivateStaticField(String name) { - try { - Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); - f.setAccessible(true); - return f.getInt(null); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static long maxDirectMemory() { - return PlatformDependent.maxDirectMemory(); - } - - - public static ByteBuf directBuffer(int size) { - return ALLOCATOR.directBuffer(size); - } - - /** - * - * @param size the initial capacity - * @param max the max capacity - * @return allocated ByteBuf from pool - */ - public static ByteBuf directBuffer(int size, int max) { - return ALLOCATOR.directBuffer(size, max); - } - - @InterfaceStability.Unstable - public static void forceRelease(ByteBuf buf) { - buf.release(buf.refCnt()); - } - - /** - * the ByteBuf will increase to writable size - * @param buf - * @param minWritableBytes required minimum writable size - */ - public static void ensureWritable(ByteBuf buf, int minWritableBytes) { - buf.ensureWritable(minWritableBytes); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 7708d52..8ca55cc 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -23,11 +23,9 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; -import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.BitArray; import java.nio.ByteBuffer; @@ -335,56 +333,4 @@ public class RowStoreUtil { return schema; } } - - public static void convert(Tuple tuple, RowWriter writer) { - writer.startRow(); - - for (int i = 0; i < writer.dataTypes().length; i++) { - if (tuple.isBlankOrNull(i)) { - writer.skipField(); - continue; - } - switch (writer.dataTypes()[i].getType()) { - case BOOLEAN: - writer.putBool(tuple.getBool(i)); - break; - case INT1: - case INT2: - writer.putInt2(tuple.getInt2(i)); - break; - case INT4: - case DATE: - case INET4: - writer.putInt4(tuple.getInt4(i)); - break; - case INT8: - case TIMESTAMP: - case TIME: - writer.putInt8(tuple.getInt8(i)); - break; - case FLOAT4: - writer.putFloat4(tuple.getFloat4(i)); - break; - case FLOAT8: - writer.putFloat8(tuple.getFloat8(i)); - break; - case TEXT: - writer.putText(tuple.getBytes(i)); - break; - case INTERVAL: - writer.putInterval((IntervalDatum) tuple.getInterval(i)); - break; - case PROTOBUF: - writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); - break; - case NULL_TYPE: - writer.skipField(); - break; - default: - throw new TajoRuntimeException( - new UnsupportedException("unknown data type '" + writer.dataTypes()[i].getType().name() + "'")); - } - } - writer.endRow(); - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java deleted file mode 100644 index c1835df..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java +++ /dev/null @@ -1,112 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.tuple.offheap.HeapTuple; -import org.apache.tajo.tuple.offheap.OffHeapRowWriter; -import org.apache.tajo.tuple.offheap.ZeroCopyTuple; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { - private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class); - - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - // buffer - private ByteBuffer buffer; - private long address; - - public BaseTupleBuilder(Schema schema) { - super(SchemaUtil.toDataTypes(schema)); - buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder()); - address = UnsafeUtil.getAddress(buffer); - } - - @Override - public long address() { - return address; - } - - public void ensureSize(int size) { - if (buffer.remaining() - size < 0) { // check the remain size - // enlarge new buffer and copy writing data - int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2); - ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); - long newAddress = ((DirectBuffer)newByteBuf).address(); - UNSAFE.copyMemory(this.address, newAddress, buffer.limit()); - LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false)); - - // release existing buffer and replace variables - UnsafeUtil.free(buffer); - buffer = newByteBuf; - address = newAddress; - } - } - - @Override - public int position() { - return 0; - } - - @Override - public void forward(int length) { - } - - @Override - public void endRow() { - super.endRow(); - buffer.position(0).limit(offset()); - } - - @Override - public Tuple build() { - return buildToHeapTuple(); - } - - public HeapTuple buildToHeapTuple() { - byte [] bytes = new byte[buffer.limit()]; - UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit()); - return new HeapTuple(bytes, dataTypes()); - } - - public ZeroCopyTuple buildToZeroCopyTuple() { - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - zcTuple.set(buffer, 0, buffer.limit(), dataTypes()); - return zcTuple; - } - - public void release() { - UnsafeUtil.free(buffer); - buffer = null; - address = 0; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java deleted file mode 100644 index be734e1..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.tajo.storage.Tuple; - -public interface RowBlockReader<T extends Tuple> { - - /** - * Return for each tuple - * - * @return True if tuple block is filled with tuples. Otherwise, It will return false. - */ - public boolean next(T tuple); - - public void reset(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java deleted file mode 100644 index c43c018..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java +++ /dev/null @@ -1,26 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.tuple.offheap.RowWriter; - -public interface TupleBuilder extends RowWriter { - public Tuple build(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java deleted file mode 100644 index 9662d5a..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.UnsafeUtil; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class DirectBufTuple extends UnSafeTuple implements Deallocatable { - private ByteBuffer bb; - - public DirectBufTuple(int length, DataType[] types) { - bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder()); - set(bb, 0, length, types); - } - - @Override - public void release() { - UnsafeUtil.free(bb); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java deleted file mode 100644 index a327123..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -/** - * Fixed size limit specification - */ -public class FixedSizeLimitSpec extends ResizableLimitSpec { - public FixedSizeLimitSpec(long size) { - super(size, size); - } - - public FixedSizeLimitSpec(long size, float allowedOverflowRatio) { - super(size, size, allowedOverflowRatio); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java deleted file mode 100644 index 9b69536..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java +++ /dev/null @@ -1,292 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.*; -import org.apache.tajo.exception.TajoRuntimeException; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.UnsafeUtil; -import org.apache.tajo.util.datetime.TimeMeta; -import sun.misc.Unsafe; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class HeapTuple implements Tuple { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET; - - private final byte [] data; - private final DataType [] types; - - public HeapTuple(final byte [] bytes, final DataType [] types) { - this.data = bytes; - this.types = types; - } - - @Override - public int size() { - return data.length; - } - - @Override - public TajoDataTypes.Type type(int fieldId) { - return types[fieldId].getType(); - } - - @Override - public int size(int fieldId) { - return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public void clearOffset() { - } - - public ByteBuffer nioBuffer() { - return ByteBuffer.wrap(data); - } - - private int getFieldOffset(int fieldId) { - return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); - } - - private int checkNullAndGetOffset(int fieldId) { - int offset = getFieldOffset(fieldId); - if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) { - throw new RuntimeException("Invalid Field Access: " + fieldId); - } - return offset; - } - - @Override - public boolean contains(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isBlank(int fieldid) { - return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isBlankOrNull(int fieldid) { - return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new TajoRuntimeException(new UnsupportedException()); - } - - @Override - public void clear() { - // nothing to do - } - - @Override - public void put(int fieldId, Datum value) { - throw new TajoRuntimeException(new UnsupportedException()); - } - - @Override - public void put(Datum[] values) { - throw new TajoRuntimeException(new UnsupportedException()); - } - - @Override - public Datum asDatum(int fieldId) { - if (isBlankOrNull(fieldId)) { - return NullDatum.get(); - } - - switch (types[fieldId].getType()) { - case BOOLEAN: - return DatumFactory.createBool(getBool(fieldId)); - case INT1: - case INT2: - return DatumFactory.createInt2(getInt2(fieldId)); - case INT4: - return DatumFactory.createInt4(getInt4(fieldId)); - case INT8: - return DatumFactory.createInt8(getInt4(fieldId)); - case FLOAT4: - return DatumFactory.createFloat4(getFloat4(fieldId)); - case FLOAT8: - return DatumFactory.createFloat8(getFloat8(fieldId)); - case TEXT: - return DatumFactory.createText(getText(fieldId)); - case TIMESTAMP: - return DatumFactory.createTimestamp(getInt8(fieldId)); - case DATE: - return DatumFactory.createDate(getInt4(fieldId)); - case TIME: - return DatumFactory.createTime(getInt8(fieldId)); - case INTERVAL: - return getInterval(fieldId); - case INET4: - return DatumFactory.createInet4(getInt4(fieldId)); - case PROTOBUF: - return getProtobufDatum(fieldId); - default: - throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'")); - } - } - - @Override - public void setOffset(long offset) { - } - - @Override - public long getOffset() { - return 0; - } - - @Override - public boolean getBool(int fieldId) { - return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01; - } - - @Override - public byte getByte(int fieldId) { - return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public char getChar(int fieldId) { - return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public byte[] getBytes(int fieldId) { - long pos = checkNullAndGetOffset(fieldId); - int len = UNSAFE.getInt(data, BASE_OFFSET + pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return bytes; - } - - @Override - public byte[] getTextBytes(int fieldId) { - return getText(fieldId).getBytes(); - } - - @Override - public short getInt2(int fieldId) { - return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public int getInt4(int fieldId) { - return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public long getInt8(int fieldId) { - return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public float getFloat4(int fieldId) { - return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public double getFloat8(int fieldId) { - return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public String getText(int fieldId) { - return new String(getBytes(fieldId)); - } - - @Override - public TimeMeta getTimeDate(int fieldId) { - return asDatum(fieldId).asTimeMeta(); - } - - public IntervalDatum getInterval(int fieldId) { - long pos = checkNullAndGetOffset(fieldId); - int months = UNSAFE.getInt(data, BASE_OFFSET + pos); - pos += SizeOf.SIZE_OF_INT; - long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos); - return new IntervalDatum(months, millisecs); - } - - @Override - public Datum getProtobufDatum(int fieldId) { - byte [] bytes = getBytes(fieldId); - - ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); - Message.Builder builder = factory.newBuilder(); - try { - builder.mergeFrom(bytes); - } catch (InvalidProtocolBufferException e) { - return NullDatum.get(); - } - - return new ProtobufDatum(builder.build()); - } - - @Override - public char[] getUnicodeChars(int fieldId) { - long pos = checkNullAndGetOffset(fieldId); - int len = UNSAFE.getInt(data, BASE_OFFSET + pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - return this; - } - - @Override - public Datum[] getValues() { - Datum [] datums = new Datum[size()]; - for (int i = 0; i < size(); i++) { - if (contains(i)) { - datums[i] = asDatum(i); - } else { - datums[i] = NullDatum.get(); - } - } - return datums; - } - - @Override - public String toString() { - return VTuple.toDisplayString(getValues()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java deleted file mode 100644 index 2f8e349..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -public class OffHeapMemory implements Deallocatable { - private static final Log LOG = LogFactory.getLog(OffHeapMemory.class); - - protected static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - protected ByteBuffer buffer; - protected int memorySize; - protected ResizableLimitSpec limitSpec; - protected long address; - - @VisibleForTesting - protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) { - this.buffer = buffer; - this.address = ((DirectBuffer) buffer).address(); - this.memorySize = buffer.limit(); - this.limitSpec = limitSpec; - } - - public OffHeapMemory(ResizableLimitSpec limitSpec) { - this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec); - } - - public long address() { - return address; - } - - public long size() { - return memorySize; - } - - public void resize(int newSize) { - Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); - - if (newSize > limitSpec.limit()) { - throw new RuntimeException("Resize cannot exceed the size limit"); - } - - if (newSize < memorySize) { - LOG.warn("The size reduction is ignored."); - } - - int newBlockSize = UnsafeUtil.alignedSize(newSize); - ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); - long newAddress = ((DirectBuffer)newByteBuf).address(); - - UNSAFE.copyMemory(this.address, newAddress, memorySize); - - UnsafeUtil.free(buffer); - this.memorySize = newSize; - this.buffer = newByteBuf; - this.address = newAddress; - } - - public java.nio.Buffer nioBuffer() { - return (ByteBuffer) buffer.position(0).limit(memorySize); - } - - @Override - public void release() { - UnsafeUtil.free(this.buffer); - this.buffer = null; - this.address = 0; - this.memorySize = 0; - } - - public String toString() { - return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java deleted file mode 100644 index 90d4791..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java +++ /dev/null @@ -1,213 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.storage.SeekableInputChannel; -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.SizeOf; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable { - private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class); - - public static final int NULL_FIELD_OFFSET = -1; - - DataType [] dataTypes; - - // Basic States - private int maxRowNum = Integer.MAX_VALUE; // optional - private int rowNum; - protected int position = 0; - - private OffHeapRowBlockWriter builder; - - private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) { - super(buffer, limitSpec); - initialize(schema); - } - - public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) { - super(limitSpec); - initialize(schema); - } - - private void initialize(Schema schema) { - dataTypes = SchemaUtil.toDataTypes(schema); - - this.builder = new OffHeapRowBlockWriter(this); - } - - @VisibleForTesting - public OffHeapRowBlock(Schema schema, int bytes) { - this(schema, new ResizableLimitSpec(bytes)); - } - - @VisibleForTesting - public OffHeapRowBlock(Schema schema, ByteBuffer buffer) { - this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT); - } - - public void position(int pos) { - this.position = pos; - } - - public void clear() { - this.position = 0; - this.rowNum = 0; - - builder.clear(); - } - - @Override - public ByteBuffer nioBuffer() { - return (ByteBuffer) buffer.position(0).limit(position); - } - - public int position() { - return position; - } - - public long usedMem() { - return position; - } - - /** - * Ensure that this buffer has enough remaining space to add the size. - * Creates and copies to a new buffer if necessary - * - * @param size Size to add - */ - public void ensureSize(int size) { - if (remain() - size < 0) { - if (!limitSpec.canIncrease(memorySize)) { - throw new RuntimeException("Cannot increase RowBlock anymore."); - } - - int newBlockSize = limitSpec.increasedSize(memorySize); - resize(newBlockSize); - LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); - } - } - - public long remain() { - return memorySize - position - builder.offset(); - } - - public int maxRowNum() { - return maxRowNum; - } - public int rows() { - return rowNum; - } - - public void setRows(int rowNum) { - this.rowNum = rowNum; - } - - - public boolean copyFromChannel(SeekableInputChannel channel, TableStats stats) throws IOException { - if (channel.position() < channel.size()) { - clear(); - - buffer.clear(); - channel.read(buffer); - memorySize = buffer.position(); - - while (position < memorySize) { - long recordPtr = address + position; - - if (remain() < SizeOf.SIZE_OF_INT) { - channel.seek(channel.position() - remain()); - memorySize = (int) (memorySize - remain()); - return true; - } - - int recordSize = UNSAFE.getInt(recordPtr); - - if (remain() < recordSize) { - channel.seek(channel.position() - remain()); - memorySize = (int) (memorySize - remain()); - return true; - } - - position += recordSize; - rowNum++; - } - - return true; - } else { - return false; - } - } - - public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException { - if (channel.position() < channel.size()) { - clear(); - - buffer.clear(); - channel.read(buffer); - memorySize = buffer.position(); - - while (position < memorySize) { - long recordPtr = address + position; - - if (remain() < SizeOf.SIZE_OF_INT) { - channel.position(channel.position() - remain()); - memorySize = (int) (memorySize - remain()); - return true; - } - - int recordSize = UNSAFE.getInt(recordPtr); - - if (remain() < recordSize) { - channel.position(channel.position() - remain()); - memorySize = (int) (memorySize - remain()); - return true; - } - - position += recordSize; - rowNum++; - } - - return true; - } else { - return false; - } - } - - public RowWriter getWriter() { - return builder; - } - - public OffHeapRowBlockReader getReader() { - return new OffHeapRowBlockReader(this); - } -}
