http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java new file mode 100644 index 0000000..c1835df --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -0,0 +1,112 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.HeapTuple; +import org.apache.tajo.tuple.offheap.OffHeapRowWriter; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { + private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class); + + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + // buffer + private ByteBuffer buffer; + private long address; + + public BaseTupleBuilder(Schema schema) { + super(SchemaUtil.toDataTypes(schema)); + buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder()); + address = UnsafeUtil.getAddress(buffer); + } + + @Override + public long address() { + return address; + } + + public void ensureSize(int size) { + if (buffer.remaining() - size < 0) { // check the remain size + // enlarge new buffer and copy writing data + int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + UNSAFE.copyMemory(this.address, newAddress, buffer.limit()); + LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + + // release existing buffer and replace variables + UnsafeUtil.free(buffer); + buffer = newByteBuf; + address = newAddress; + } + } + + @Override + public int position() { + return 0; + } + + @Override + public void forward(int length) { + } + + @Override + public void endRow() { + super.endRow(); + buffer.position(0).limit(offset()); + } + + @Override + public Tuple build() { + return buildToHeapTuple(); + } + + public HeapTuple buildToHeapTuple() { + byte [] bytes = new byte[buffer.limit()]; + UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit()); + return new HeapTuple(bytes, dataTypes()); + } + + public ZeroCopyTuple buildToZeroCopyTuple() { + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + zcTuple.set(buffer, 0, buffer.limit(), dataTypes()); + return zcTuple; + } + + public void release() { + UnsafeUtil.free(buffer); + buffer = null; + address = 0; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java new file mode 100644 index 0000000..be734e1 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; + +public interface RowBlockReader<T extends Tuple> { + + /** + * Return for each tuple + * + * @return True if tuple block is filled with tuples. Otherwise, It will return false. + */ + public boolean next(T tuple); + + public void reset(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java new file mode 100644 index 0000000..c43c018 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java @@ -0,0 +1,26 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.RowWriter; + +public interface TupleBuilder extends RowWriter { + public Tuple build(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java new file mode 100644 index 0000000..9662d5a --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.UnsafeUtil; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class DirectBufTuple extends UnSafeTuple implements Deallocatable { + private ByteBuffer bb; + + public DirectBufTuple(int length, DataType[] types) { + bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder()); + set(bb, 0, length, types); + } + + @Override + public void release() { + UnsafeUtil.free(bb); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java new file mode 100644 index 0000000..a327123 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +/** + * Fixed size limit specification + */ +public class FixedSizeLimitSpec extends ResizableLimitSpec { + public FixedSizeLimitSpec(long size) { + super(size, size); + } + + public FixedSizeLimitSpec(long size, float allowedOverflowRatio) { + super(size, size, allowedOverflowRatio); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java new file mode 100644 index 0000000..33f9f1c --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java @@ -0,0 +1,272 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.UnsafeUtil; + +import sun.misc.Unsafe; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class HeapTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET; + + private final byte [] data; + private final DataType [] types; + + public HeapTuple(final byte [] bytes, final DataType [] types) { + this.data = bytes; + this.types = types; + } + + @Override + public int size() { + return data.length; + } + + public ByteBuffer nioBuffer() { + return ByteBuffer.wrap(data); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + private int checkNullAndGetOffset(int fieldId) { + int offset = getFieldOffset(fieldId); + if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return offset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public String getText(int fieldId) { + return new String(getBytes(fieldId)); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int months = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return this; + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java new file mode 100644 index 0000000..2f8e349 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapMemory.class); + + protected static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + protected ByteBuffer buffer; + protected int memorySize; + protected ResizableLimitSpec limitSpec; + protected long address; + + @VisibleForTesting + protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) { + this.buffer = buffer; + this.address = ((DirectBuffer) buffer).address(); + this.memorySize = buffer.limit(); + this.limitSpec = limitSpec; + } + + public OffHeapMemory(ResizableLimitSpec limitSpec) { + this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec); + } + + public long address() { + return address; + } + + public long size() { + return memorySize; + } + + public void resize(int newSize) { + Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); + + if (newSize > limitSpec.limit()) { + throw new RuntimeException("Resize cannot exceed the size limit"); + } + + if (newSize < memorySize) { + LOG.warn("The size reduction is ignored."); + } + + int newBlockSize = UnsafeUtil.alignedSize(newSize); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + + UNSAFE.copyMemory(this.address, newAddress, memorySize); + + UnsafeUtil.free(buffer); + this.memorySize = newSize; + this.buffer = newByteBuf; + this.address = newAddress; + } + + public java.nio.Buffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(memorySize); + } + + @Override + public void release() { + UnsafeUtil.free(this.buffer); + this.buffer = null; + this.address = 0; + this.memorySize = 0; + } + + public String toString() { + return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java new file mode 100644 index 0000000..689efb7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java @@ -0,0 +1,176 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.SizeOf; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class); + + public static final int NULL_FIELD_OFFSET = -1; + + DataType [] dataTypes; + + // Basic States + private int maxRowNum = Integer.MAX_VALUE; // optional + private int rowNum; + protected int position = 0; + + private OffHeapRowBlockWriter builder; + + private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) { + super(buffer, limitSpec); + initialize(schema); + } + + public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) { + super(limitSpec); + initialize(schema); + } + + private void initialize(Schema schema) { + dataTypes = SchemaUtil.toDataTypes(schema); + + this.builder = new OffHeapRowBlockWriter(this); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, int bytes) { + this(schema, new ResizableLimitSpec(bytes)); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, ByteBuffer buffer) { + this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT); + } + + public void position(int pos) { + this.position = pos; + } + + public void clear() { + this.position = 0; + this.rowNum = 0; + + builder.clear(); + } + + @Override + public ByteBuffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(position); + } + + public int position() { + return position; + } + + public long usedMem() { + return position; + } + + /** + * Ensure that this buffer has enough remaining space to add the size. + * Creates and copies to a new buffer if necessary + * + * @param size Size to add + */ + public void ensureSize(int size) { + if (remain() - size < 0) { + if (!limitSpec.canIncrease(memorySize)) { + throw new RuntimeException("Cannot increase RowBlock anymore."); + } + + int newBlockSize = limitSpec.increasedSize(memorySize); + resize(newBlockSize); + LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + } + } + + public long remain() { + return memorySize - position - builder.offset(); + } + + public int maxRowNum() { + return maxRowNum; + } + public int rows() { + return rowNum; + } + + public void setRows(int rowNum) { + this.rowNum = rowNum; + } + + public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException { + if (channel.position() < channel.size()) { + clear(); + + buffer.clear(); + channel.read(buffer); + memorySize = buffer.position(); + + while (position < memorySize) { + long recordPtr = address + position; + + if (remain() < SizeOf.SIZE_OF_INT) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + int recordSize = UNSAFE.getInt(recordPtr); + + if (remain() < recordSize) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + position += recordSize; + rowNum++; + } + + return true; + } else { + return false; + } + } + + public RowWriter getWriter() { + return builder; + } + + public OffHeapRowBlockReader getReader() { + return new OffHeapRowBlockReader(this); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java new file mode 100644 index 0000000..4a9313f --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java @@ -0,0 +1,63 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + OffHeapRowBlock rowBlock; + + // Read States + private int curRowIdxForRead; + private int curPosForRead; + + public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) { + this.rowBlock = rowBlock; + } + + public long remainForRead() { + return rowBlock.memorySize - curPosForRead; + } + + @Override + public boolean next(ZeroCopyTuple tuple) { + if (curRowIdxForRead < rowBlock.rows()) { + + long recordStartPtr = rowBlock.address() + curPosForRead; + int recordLen = UNSAFE.getInt(recordStartPtr); + tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes); + + curPosForRead += recordLen; + curRowIdxForRead++; + + return true; + } else { + return false; + } + } + + @Override + public void reset() { + curPosForRead = 0; + curRowIdxForRead = 0; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java new file mode 100644 index 0000000..dbc3188 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.collect.Lists; +import org.apache.tajo.storage.Tuple; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OffHeapRowBlockUtils { + + public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + List<Tuple> tupleList = Lists.newArrayList(); + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + while(reader.next(zcTuple)) { + tupleList.add(zcTuple); + zcTuple = new ZeroCopyTuple(); + } + Collections.sort(tupleList, comparator); + return tupleList; + } + + public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + Tuple[] tuples = new Tuple[rowBlock.rows()]; + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) { + tuples[i] = zcTuple; + zcTuple = new ZeroCopyTuple(); + } + Arrays.sort(tuples, comparator); + return tuples; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java new file mode 100644 index 0000000..d177e0c --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; + +public class OffHeapRowBlockWriter extends OffHeapRowWriter { + OffHeapRowBlock rowBlock; + + OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) { + super(rowBlock.dataTypes); + this.rowBlock = rowBlock; + } + + public long address() { + return rowBlock.address(); + } + + public int position() { + return rowBlock.position(); + } + + @Override + public void forward(int length) { + rowBlock.position(position() + length); + } + + public void ensureSize(int size) { + rowBlock.ensureSize(size); + } + + @Override + public void endRow() { + super.endRow(); + rowBlock.setRows(rowBlock.rows() + 1); + } + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return rowBlock.dataTypes; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java new file mode 100644 index 0000000..85c7e0b --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; + +/** + * + * Row Record Structure + * + * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N | + * 4 bytes 4 bytes 4 bytes + * + */ +public abstract class OffHeapRowWriter implements RowWriter { + /** record size + offset list */ + private final int headerSize; + /** field offsets */ + private final int [] fieldOffsets; + private final TajoDataTypes.DataType [] dataTypes; + + private int curFieldIdx; + private int curOffset; + + public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) { + this.dataTypes = dataTypes; + fieldOffsets = new int[dataTypes.length]; + headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1); + } + + public void clear() { + curOffset = 0; + curFieldIdx = 0; + } + + public long recordStartAddr() { + return address() + position(); + } + + public abstract long address(); + + public abstract void ensureSize(int size); + + public int offset() { + return curOffset; + } + + /** + * Current position + * + * @return The position + */ + public abstract int position(); + + /** + * Forward the address; + * + * @param length Length to be forwarded + */ + public abstract void forward(int length); + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return dataTypes; + } + + public boolean startRow() { + curOffset = headerSize; + curFieldIdx = 0; + return true; + } + + public void endRow() { + long rowHeaderPos = address() + position(); + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset); + rowHeaderPos += SizeOf.SIZE_OF_INT; + + for (int i = 0; i < curFieldIdx; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + for (int i = curFieldIdx; i < dataTypes.length; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + + // rowOffset is equivalent to a byte length of this row. + forward(curOffset); + } + + public void skipField() { + fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + private void forwardField() { + fieldOffsets[curFieldIdx++] = curOffset; + } + + public void putBool(boolean val) { + ensureSize(SizeOf.SIZE_OF_BOOL); + forwardField(); + + OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00)); + + curOffset += SizeOf.SIZE_OF_BOOL; + } + + public void putInt2(short val) { + ensureSize(SizeOf.SIZE_OF_SHORT); + forwardField(); + + OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_SHORT; + } + + public void putInt4(int val) { + ensureSize(SizeOf.SIZE_OF_INT); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_INT; + } + + public void putInt8(long val) { + ensureSize(SizeOf.SIZE_OF_LONG); + forwardField(); + + OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_LONG; + } + + public void putFloat4(float val) { + ensureSize(SizeOf.SIZE_OF_FLOAT); + forwardField(); + + OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_FLOAT; + } + + public void putFloat8(double val) { + ensureSize(SizeOf.SIZE_OF_DOUBLE); + forwardField(); + + OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_DOUBLE; + } + + public void putText(String val) { + byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET); + putText(bytes); + } + + public void putText(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, + recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putBlob(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, + recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putTimestamp(long val) { + putInt8(val); + } + + public void putDate(int val) { + putInt4(val); + } + + public void putTime(long val) { + putInt8(val); + } + + public void putInterval(IntervalDatum val) { + ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); + forwardField(); + + long offset = recordStartAddr() + curOffset; + OffHeapMemory.UNSAFE.putInt(offset, val.getMonths()); + offset += SizeOf.SIZE_OF_INT; + OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds()); + curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG; + } + + public void putInet4(int val) { + putInt4(val); + } + + public void putProtoDatum(ProtobufDatum val) { + putBlob(val.asByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java new file mode 100644 index 0000000..14e67b2 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.FileUtil; + +/** + * It specifies the maximum size or increasing ratio. In addition, + * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31 + * due to ByteBuffer. + */ +public class ResizableLimitSpec { + private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); + + public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; + public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); + + private final long initSize; + private final long limitBytes; + private final float incRatio; + private final float allowedOVerflowRatio; + private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f; + private final static float DEFAULT_INCREASE_RATIO = 1.0f; + + public ResizableLimitSpec(long initSize) { + this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes) { + this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) { + this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) { + Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes."); + Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB."); + Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes."); + Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB."); + Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0."); + + if (initSize == limitBytes) { + long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio)); + + if (overflowedSize > Integer.MAX_VALUE) { + overflowedSize = Integer.MAX_VALUE; + } + + this.initSize = overflowedSize; + this.limitBytes = overflowedSize; + } else { + this.initSize = initSize; + limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio)); + + if (limitBytes > Integer.MAX_VALUE) { + this.limitBytes = Integer.MAX_VALUE; + } else { + this.limitBytes = limitBytes; + } + } + + this.allowedOVerflowRatio = allowedOverflowRatio; + this.incRatio = incRatio; + } + + public long initialSize() { + return initSize; + } + + public long limit() { + return limitBytes; + } + + public float remainRatio(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + if (currentSize > Integer.MAX_VALUE) { + currentSize = Integer.MAX_VALUE; + } + return (float)currentSize / (float)limitBytes; + } + + public boolean canIncrease(long currentSize) { + return remain(currentSize) > 0; + } + + public long remain(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; + } + + public int increasedSize(int currentSize) { + if (currentSize < initSize) { + return (int) initSize; + } + + if (currentSize > Integer.MAX_VALUE) { + LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); + return Integer.MAX_VALUE; + } + long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); + + if (nextSize > limitBytes) { + LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")"); + nextSize = limitBytes; + } + + if (nextSize > Integer.MAX_VALUE) { + LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")"); + nextSize = Integer.MAX_VALUE; + } + + return (int) nextSize; + } + + @Override + public String toString() { + return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit=" + + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio + + ",inc_ratio=" + incRatio; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java new file mode 100644 index 0000000..a2b2561 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java @@ -0,0 +1,73 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; + +/** + * The call sequence should be as follows: + * + * <pre> + * startRow() --> skipField() or putXXX --> endRow() + * </pre> + * + * The total number of skipField and putXXX invocations must be equivalent to the number of fields. + */ +public interface RowWriter { + + public TajoDataTypes.DataType [] dataTypes(); + + public boolean startRow(); + + public void endRow(); + + public void skipField(); + + public void putBool(boolean val); + + public void putInt2(short val); + + public void putInt4(int val); + + public void putInt8(long val); + + public void putFloat4(float val); + + public void putFloat8(double val); + + public void putText(String val); + + public void putText(byte[] val); + + public void putBlob(byte[] val); + + public void putTimestamp(long val); + + public void putTime(long val); + + public void putDate(int val); + + public void putInterval(IntervalDatum val); + + public void putInet4(int val); + + public void putProtoDatum(ProtobufDatum datum); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java new file mode 100644 index 0000000..b742e6d --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -0,0 +1,311 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.UnsafeUtil; + +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.Charset; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public abstract class UnSafeTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + private DirectBuffer bb; + private int relativePos; + private int length; + private DataType [] types; + + protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + this.bb = (DirectBuffer) bb; + this.relativePos = relativePos; + this.length = length; + this.types = types; + } + + void set(ByteBuffer bb, DataType [] types) { + set(bb, 0, bb.limit(), types); + } + + @Override + public int size() { + return types.length; + } + + public ByteBuffer nioBuffer() { + return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); + } + + public HeapTuple toHeapTuple() { + byte [] bytes = new byte[length]; + UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length); + return new HeapTuple(bytes, types); + } + + public void copyFrom(UnSafeTuple tuple) { + Preconditions.checkNotNull(tuple); + + ((ByteBuffer) bb).clear(); + if (length < tuple.length) { + UnsafeUtil.free((ByteBuffer) bb); + bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder()); + this.relativePos = 0; + this.length = tuple.length; + } + + ((ByteBuffer) bb).put(tuple.nioBuffer()); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + public long getFieldAddr(int fieldId) { + int fieldOffset = getFieldOffset(fieldId); + if (fieldOffset == -1) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return bb.address() + relativePos + fieldOffset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(getFieldAddr(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + long addr = getFieldAddr(fieldId); + return UNSAFE.getShort(addr); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(getFieldAddr(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(getFieldAddr(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(getFieldAddr(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(getFieldAddr(fieldId)); + } + + @Override + public String getText(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return new String(bytes); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = getFieldAddr(fieldId); + int months = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return toHeapTuple(); + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } + + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java new file mode 100644 index 0000000..73e1e2f --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java @@ -0,0 +1,99 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedLongs; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +import java.nio.ByteOrder; + +/** + * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator. + */ +public class UnSafeTupleBytesComparator { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + static final boolean littleEndian = + ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); + + public static int compare(long ptr1, long ptr2) { + int lstrLen = UNSAFE.getInt(ptr1); + int rstrLen = UNSAFE.getInt(ptr2); + + ptr1 += SizeOf.SIZE_OF_INT; + ptr2 += SizeOf.SIZE_OF_INT; + + int minLength = Math.min(lstrLen, rstrLen); + int minWords = minLength / Longs.BYTES; + + /* + * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a + * time is no slower than comparing 4 bytes at a time even on 32-bit. + * On the other hand, it is substantially faster on 64-bit. + */ + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = UNSAFE.getLong(ptr1); + long rw = UNSAFE.getLong(ptr2); + long diff = lw ^ rw; + + if (diff != 0) { + if (!littleEndian) { + return UnsignedLongs.compare(lw, rw); + } + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + if (x == 0) { + x = (int) (diff >>> 32); + n = 32; + } + + y = x << 16; + if (y == 0) { + n += 16; + } else { + x = y; + } + + y = x << 8; + if (y == 0) { + n += 8; + } + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + + ptr1 += SizeOf.SIZE_OF_LONG; + ptr2 += SizeOf.SIZE_OF_LONG; + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++); + if (result != 0) { + return result; + } + } + return lstrLen - rstrLen; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java new file mode 100644 index 0000000..51dbb29 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import java.nio.ByteBuffer; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class ZeroCopyTuple extends UnSafeTuple { + + public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + super.set(bb, relativePos, length, types); + } + + @Override + public void release() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto new file mode 100644 index 0000000..f5c8a08 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.index"; +option java_outer_classname = "IndexProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message TupleComparatorProto { + required SchemaProto schema = 1; + repeated SortSpecProto sortSpecs = 2; + repeated TupleComparatorSpecProto compSpecs = 3; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml new file mode 100644 index 0000000..67033ed --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -0,0 +1,198 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <!-- Storage Manager Configuration --> + <property> + <name>tajo.storage.manager.hdfs.class</name> + <value>org.apache.tajo.storage.FileStorageManager</value> + </property> + <property> + <name>tajo.storage.manager.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + </property> + + <property> + <name>tajo.storage.manager.concurrency.perDisk</name> + <value>1</value> + <description></description> + </property> + + <!--- Registered Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler</name> + <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + </property> + + <!--- Fragment Class Configurations --> + <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.csv.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.json.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.raw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.rcfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.row.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.parquet.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.sequencefile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.avro.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseFragment</value> + </property> + + <!--- Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.json.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseScanner</value> + </property> + + <!--- Appender Handler --> + <property> + <name>tajo.storage.appender-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + </property> + + <property> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.json.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.hfile.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java new file mode 100644 index 0000000..0251dc7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestFrameTuple { + private Tuple tuple1; + private Tuple tuple2; + + @Before + public void setUp() throws Exception { + tuple1 = new VTuple(11); + tuple1.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar('9'), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1") + }); + + tuple2 = new VTuple(11); + tuple2.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar('9'), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1") + }); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testFrameTuple() { + Tuple frame = new FrameTuple(tuple1, tuple2); + assertEquals(22, frame.size()); + for (int i = 0; i < 22; i++) { + assertTrue(frame.contains(i)); + } + + assertEquals(DatumFactory.createInt8(23l), frame.get(5)); + assertEquals(DatumFactory.createInt8(23l), frame.get(16)); + assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10)); + assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21)); + } +}
