http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java index 167e4a8..bfbe478 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage; import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; @@ -68,7 +69,12 @@ public class LazyTuple implements Tuple, Cloneable { @Override public boolean isNull(int fieldid) { - return get(fieldid) instanceof NullDatum; + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -199,6 +205,11 @@ public class LazyTuple implements Tuple, Cloneable { } @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override public char[] getUnicodeChars(int fieldId) { return get(fieldId).asUnicodeChars(); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 70044ca..24b6280 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -23,8 +23,10 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.exception.UnknownDataTypeException; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.BitArray; import java.nio.ByteBuffer; @@ -177,7 +179,8 @@ public class RowStoreUtil { nullFlags = new BitArray(schema.size()); headerSize = nullFlags.bytesLength(); } - public byte [] toBytes(Tuple tuple) { + + public byte[] toBytes(Tuple tuple) { nullFlags.clear(); int size = estimateTupleDataSize(tuple); ByteBuffer bb = ByteBuffer.allocate(size + headerSize); @@ -191,42 +194,64 @@ public class RowStoreUtil { col = schema.getColumn(i); switch (col.getDataType().getType()) { - case NULL_TYPE: nullFlags.set(i); break; - case BOOLEAN: bb.put(tuple.get(i).asByte()); break; - case BIT: bb.put(tuple.get(i).asByte()); break; - case CHAR: bb.put(tuple.get(i).asByte()); break; - case INT2: bb.putShort(tuple.get(i).asInt2()); break; - case INT4: bb.putInt(tuple.get(i).asInt4()); break; - case INT8: bb.putLong(tuple.get(i).asInt8()); break; - case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break; - case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break; - case TEXT: - byte [] _string = tuple.get(i).asByteArray(); - bb.putInt(_string.length); - bb.put(_string); - break; - case DATE: bb.putInt(tuple.get(i).asInt4()); break; - case TIME: - case TIMESTAMP: - bb.putLong(tuple.get(i).asInt8()); - break; - case INTERVAL: - IntervalDatum interval = (IntervalDatum) tuple.get(i); - bb.putInt(interval.getMonths()); - bb.putLong(interval.getMilliSeconds()); - break; - case BLOB: - byte [] bytes = tuple.get(i).asByteArray(); - bb.putInt(bytes.length); - bb.put(bytes); - break; - case INET4: - byte [] ipBytes = tuple.get(i).asByteArray(); - bb.put(ipBytes); - break; - case INET6: bb.put(tuple.get(i).asByteArray()); break; - default: - throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + case NULL_TYPE: + nullFlags.set(i); + break; + case BOOLEAN: + bb.put(tuple.get(i).asByte()); + break; + case BIT: + bb.put(tuple.get(i).asByte()); + break; + case CHAR: + bb.put(tuple.get(i).asByte()); + break; + case INT2: + bb.putShort(tuple.get(i).asInt2()); + break; + case INT4: + bb.putInt(tuple.get(i).asInt4()); + break; + case INT8: + bb.putLong(tuple.get(i).asInt8()); + break; + case FLOAT4: + bb.putFloat(tuple.get(i).asFloat4()); + break; + case FLOAT8: + bb.putDouble(tuple.get(i).asFloat8()); + break; + case TEXT: + byte[] _string = tuple.get(i).asByteArray(); + bb.putInt(_string.length); + bb.put(_string); + break; + case DATE: + bb.putInt(tuple.get(i).asInt4()); + break; + case TIME: + case TIMESTAMP: + bb.putLong(tuple.get(i).asInt8()); + break; + case INTERVAL: + IntervalDatum interval = (IntervalDatum) tuple.get(i); + bb.putInt(interval.getMonths()); + bb.putLong(interval.getMilliSeconds()); + break; + case BLOB: + byte[] bytes = tuple.get(i).asByteArray(); + bb.putInt(bytes.length); + bb.put(bytes); + break; + case INET4: + byte[] ipBytes = tuple.get(i).asByteArray(); + bb.put(ipBytes); + break; + case INET6: + bb.put(tuple.get(i).asByteArray()); + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); } } @@ -237,7 +262,7 @@ public class RowStoreUtil { bb.position(finalPosition); bb.flip(); - byte [] buf = new byte [bb.limit()]; + byte[] buf = new byte[bb.limit()]; bb.get(buf); return buf; } @@ -254,24 +279,38 @@ public class RowStoreUtil { col = schema.getColumn(i); switch (col.getDataType().getType()) { - case BOOLEAN: - case BIT: - case CHAR: size += 1; break; - case INT2: size += 2; break; - case DATE: - case INT4: - case FLOAT4: size += 4; break; - case TIME: - case TIMESTAMP: - case INT8: - case FLOAT8: size += 8; break; - case INTERVAL: size += 12; break; - case TEXT: - case BLOB: size += (4 + tuple.get(i).asByteArray().length); break; - case INET4: - case INET6: size += tuple.get(i).asByteArray().length; break; - default: - throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + case BOOLEAN: + case BIT: + case CHAR: + size += 1; + break; + case INT2: + size += 2; + break; + case DATE: + case INT4: + case FLOAT4: + size += 4; + break; + case TIME: + case TIMESTAMP: + case INT8: + case FLOAT8: + size += 8; + break; + case INTERVAL: + size += 12; + break; + case TEXT: + case BLOB: + size += (4 + tuple.get(i).asByteArray().length); + break; + case INET4: + case INET6: + size += tuple.get(i).asByteArray().length; + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); } } @@ -284,4 +323,55 @@ public class RowStoreUtil { return schema; } } + + public static void convert(Tuple tuple, RowWriter writer) { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + if (tuple.isNull(i)) { + writer.skipField(); + continue; + } + switch (writer.dataTypes()[i].getType()) { + case BOOLEAN: + writer.putBool(tuple.getBool(i)); + break; + case INT1: + case INT2: + writer.putInt2(tuple.getInt2(i)); + break; + case INT4: + case DATE: + case INET4: + writer.putInt4(tuple.getInt4(i)); + break; + case INT8: + case TIMESTAMP: + case TIME: + writer.putInt8(tuple.getInt8(i)); + break; + case FLOAT4: + writer.putFloat4(tuple.getFloat4(i)); + break; + case FLOAT8: + writer.putFloat8(tuple.getFloat8(i)); + break; + case TEXT: + writer.putText(tuple.getBytes(i)); + break; + case INTERVAL: + writer.putInterval((IntervalDatum) tuple.getInterval(i)); + break; + case PROTOBUF: + writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); + break; + case NULL_TYPE: + writer.skipField(); + break; + default: + throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]); + } + } + writer.endRow(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java index 084c105..8dffd8d 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java @@ -1,4 +1,4 @@ -/** +/*** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,164 +18,15 @@ package org.apache.tajo.storage; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; import java.util.Comparator; -import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto; import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; -/** - * The Comparator class for Tuples - * - * @see Tuple - */ -public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> { - private final int[] sortKeyIds; - private final boolean[] asc; - @SuppressWarnings("unused") - private final boolean[] nullFirsts; - - /** - * @param schema The schema of input tuples - * @param sortKeys The description of sort keys - */ - public TupleComparator(Schema schema, SortSpec[] sortKeys) { - Preconditions.checkArgument(sortKeys.length > 0, - "At least one sort key must be specified."); - - this.sortKeyIds = new int[sortKeys.length]; - this.asc = new boolean[sortKeys.length]; - this.nullFirsts = new boolean[sortKeys.length]; - for (int i = 0; i < sortKeys.length; i++) { - if (sortKeys[i].getSortKey().hasQualifier()) { - this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); - } else { - this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); - } - - this.asc[i] = sortKeys[i].isAscending(); - this.nullFirsts[i]= sortKeys[i].isNullFirst(); - } - } - - public TupleComparator(TupleComparatorProto proto) { - this.sortKeyIds = new int[proto.getCompSpecsCount()]; - this.asc = new boolean[proto.getCompSpecsCount()]; - this.nullFirsts = new boolean[proto.getCompSpecsCount()]; - - for (int i = 0; i < proto.getCompSpecsCount(); i++) { - TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i); - sortKeyIds[i] = sortSepcProto.getColumnId(); - asc[i] = sortSepcProto.getAscending(); - nullFirsts[i] = sortSepcProto.getNullFirst(); - } - } - - public boolean isAscendingFirstKey() { - return this.asc[0]; - } - - @Override - public int compare(Tuple tuple1, Tuple tuple2) { - Datum left = null; - Datum right = null; - int compVal = 0; - - for (int i = 0; i < sortKeyIds.length; i++) { - left = tuple1.get(sortKeyIds[i]); - right = tuple2.get(sortKeyIds[i]); - - if (left instanceof NullDatum || right instanceof NullDatum) { - if (!left.equals(right)) { - if (left instanceof NullDatum) { - compVal = 1; - } else if (right instanceof NullDatum) { - compVal = -1; - } - if (nullFirsts[i]) { - if (compVal != 0) { - compVal *= -1; - } - } - } else { - compVal = 0; - } - } else { - if (asc[i]) { - compVal = left.compareTo(right); - } else { - compVal = right.compareTo(left); - } - } - - if (compVal < 0 || compVal > 0) { - return compVal; - } - } - return 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(sortKeyIds); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TupleComparator) { - TupleComparator other = (TupleComparator) obj; - if (sortKeyIds.length != other.sortKeyIds.length) { - return false; - } - - for (int i = 0; i < sortKeyIds.length; i++) { - if (sortKeyIds[i] != other.sortKeyIds[i] || - asc[i] != other.asc[i] || - nullFirsts[i] != other.nullFirsts[i]) { - return false; - } - } - - return true; - } else { - return false; - } - } - - @Override - public TupleComparatorProto getProto() { - TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder(); - TupleComparatorSpecProto.Builder sortSpecBuilder; - - for (int i = 0; i < sortKeyIds.length; i++) { - sortSpecBuilder = TupleComparatorSpecProto.newBuilder(); - sortSpecBuilder.setColumnId(sortKeyIds[i]); - sortSpecBuilder.setAscending(asc[i]); - sortSpecBuilder.setNullFirst(nullFirsts[i]); - builder.addCompSpecs(sortSpecBuilder); - } - - return builder.build(); - } +public abstract class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> { - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); + public abstract int compare(Tuple o1, Tuple o2); - String prefix = ""; - for (int i = 0; i < sortKeyIds.length; i++) { - sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) - .append(",Asc=").append(asc[i]) - .append(",NullFirst=").append(nullFirsts[i]); - prefix = " ,"; - } - return sb.toString(); - } -} \ No newline at end of file + public abstract boolean isAscendingFirstKey(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java index 6cc09d4..e824b99 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java @@ -33,7 +33,7 @@ public class TupleRange implements Comparable<TupleRange>, Cloneable { private final TupleComparator comp; public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) { - this.comp = new TupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); + this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); // if there is only one value, start == end this.start = start; this.end = end; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java index 74be7ff..ccba3be 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage.index; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.TupleComparator; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java index 5d43bd5..f093f9d 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -27,12 +27,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.IndexMethod; import org.apache.tajo.storage.index.IndexWriter; import org.apache.tajo.storage.index.OrderIndexReader; @@ -72,8 +69,7 @@ public class BSTIndex implements IndexMethod { } @Override - public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, - TupleComparator comparator) throws IOException { + public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { return new BSTIndexReader(fileName, keySchema, comparator); } @@ -350,7 +346,7 @@ public class BSTIndex implements IndexMethod { TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder(); compProto.mergeFrom(compBytes); - this.comparator = new TupleComparator(compProto.build()); + this.comparator = new BaseTupleComparator(compProto.build()); // level this.level = indexIn.readInt(); http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java new file mode 100644 index 0000000..c1835df --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -0,0 +1,112 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.HeapTuple; +import org.apache.tajo.tuple.offheap.OffHeapRowWriter; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { + private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class); + + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + // buffer + private ByteBuffer buffer; + private long address; + + public BaseTupleBuilder(Schema schema) { + super(SchemaUtil.toDataTypes(schema)); + buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder()); + address = UnsafeUtil.getAddress(buffer); + } + + @Override + public long address() { + return address; + } + + public void ensureSize(int size) { + if (buffer.remaining() - size < 0) { // check the remain size + // enlarge new buffer and copy writing data + int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + UNSAFE.copyMemory(this.address, newAddress, buffer.limit()); + LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + + // release existing buffer and replace variables + UnsafeUtil.free(buffer); + buffer = newByteBuf; + address = newAddress; + } + } + + @Override + public int position() { + return 0; + } + + @Override + public void forward(int length) { + } + + @Override + public void endRow() { + super.endRow(); + buffer.position(0).limit(offset()); + } + + @Override + public Tuple build() { + return buildToHeapTuple(); + } + + public HeapTuple buildToHeapTuple() { + byte [] bytes = new byte[buffer.limit()]; + UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit()); + return new HeapTuple(bytes, dataTypes()); + } + + public ZeroCopyTuple buildToZeroCopyTuple() { + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + zcTuple.set(buffer, 0, buffer.limit(), dataTypes()); + return zcTuple; + } + + public void release() { + UnsafeUtil.free(buffer); + buffer = null; + address = 0; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java new file mode 100644 index 0000000..be734e1 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; + +public interface RowBlockReader<T extends Tuple> { + + /** + * Return for each tuple + * + * @return True if tuple block is filled with tuples. Otherwise, It will return false. + */ + public boolean next(T tuple); + + public void reset(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java new file mode 100644 index 0000000..c43c018 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java @@ -0,0 +1,26 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.RowWriter; + +public interface TupleBuilder extends RowWriter { + public Tuple build(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java new file mode 100644 index 0000000..9662d5a --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.UnsafeUtil; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class DirectBufTuple extends UnSafeTuple implements Deallocatable { + private ByteBuffer bb; + + public DirectBufTuple(int length, DataType[] types) { + bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder()); + set(bb, 0, length, types); + } + + @Override + public void release() { + UnsafeUtil.free(bb); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java new file mode 100644 index 0000000..a327123 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +/** + * Fixed size limit specification + */ +public class FixedSizeLimitSpec extends ResizableLimitSpec { + public FixedSizeLimitSpec(long size) { + super(size, size); + } + + public FixedSizeLimitSpec(long size, float allowedOverflowRatio) { + super(size, size, allowedOverflowRatio); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java new file mode 100644 index 0000000..e38555c --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java @@ -0,0 +1,269 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.sun.tools.javac.util.Convert; +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +import java.nio.ByteBuffer; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class HeapTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET; + + private final byte [] data; + private final DataType [] types; + + public HeapTuple(final byte [] bytes, final DataType [] types) { + this.data = bytes; + this.types = types; + } + + @Override + public int size() { + return data.length; + } + + public ByteBuffer nioBuffer() { + return ByteBuffer.wrap(data); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + private int checkNullAndGetOffset(int fieldId) { + int offset = getFieldOffset(fieldId); + if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return offset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public String getText(int fieldId) { + return new String(getBytes(fieldId)); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int months = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return Convert.utf2chars(bytes); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return this; + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java new file mode 100644 index 0000000..2f8e349 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapMemory.class); + + protected static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + protected ByteBuffer buffer; + protected int memorySize; + protected ResizableLimitSpec limitSpec; + protected long address; + + @VisibleForTesting + protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) { + this.buffer = buffer; + this.address = ((DirectBuffer) buffer).address(); + this.memorySize = buffer.limit(); + this.limitSpec = limitSpec; + } + + public OffHeapMemory(ResizableLimitSpec limitSpec) { + this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec); + } + + public long address() { + return address; + } + + public long size() { + return memorySize; + } + + public void resize(int newSize) { + Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); + + if (newSize > limitSpec.limit()) { + throw new RuntimeException("Resize cannot exceed the size limit"); + } + + if (newSize < memorySize) { + LOG.warn("The size reduction is ignored."); + } + + int newBlockSize = UnsafeUtil.alignedSize(newSize); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + + UNSAFE.copyMemory(this.address, newAddress, memorySize); + + UnsafeUtil.free(buffer); + this.memorySize = newSize; + this.buffer = newByteBuf; + this.address = newAddress; + } + + public java.nio.Buffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(memorySize); + } + + @Override + public void release() { + UnsafeUtil.free(this.buffer); + this.buffer = null; + this.address = 0; + this.memorySize = 0; + } + + public String toString() { + return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java new file mode 100644 index 0000000..689efb7 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java @@ -0,0 +1,176 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.SizeOf; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class); + + public static final int NULL_FIELD_OFFSET = -1; + + DataType [] dataTypes; + + // Basic States + private int maxRowNum = Integer.MAX_VALUE; // optional + private int rowNum; + protected int position = 0; + + private OffHeapRowBlockWriter builder; + + private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) { + super(buffer, limitSpec); + initialize(schema); + } + + public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) { + super(limitSpec); + initialize(schema); + } + + private void initialize(Schema schema) { + dataTypes = SchemaUtil.toDataTypes(schema); + + this.builder = new OffHeapRowBlockWriter(this); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, int bytes) { + this(schema, new ResizableLimitSpec(bytes)); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, ByteBuffer buffer) { + this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT); + } + + public void position(int pos) { + this.position = pos; + } + + public void clear() { + this.position = 0; + this.rowNum = 0; + + builder.clear(); + } + + @Override + public ByteBuffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(position); + } + + public int position() { + return position; + } + + public long usedMem() { + return position; + } + + /** + * Ensure that this buffer has enough remaining space to add the size. + * Creates and copies to a new buffer if necessary + * + * @param size Size to add + */ + public void ensureSize(int size) { + if (remain() - size < 0) { + if (!limitSpec.canIncrease(memorySize)) { + throw new RuntimeException("Cannot increase RowBlock anymore."); + } + + int newBlockSize = limitSpec.increasedSize(memorySize); + resize(newBlockSize); + LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + } + } + + public long remain() { + return memorySize - position - builder.offset(); + } + + public int maxRowNum() { + return maxRowNum; + } + public int rows() { + return rowNum; + } + + public void setRows(int rowNum) { + this.rowNum = rowNum; + } + + public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException { + if (channel.position() < channel.size()) { + clear(); + + buffer.clear(); + channel.read(buffer); + memorySize = buffer.position(); + + while (position < memorySize) { + long recordPtr = address + position; + + if (remain() < SizeOf.SIZE_OF_INT) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + int recordSize = UNSAFE.getInt(recordPtr); + + if (remain() < recordSize) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + position += recordSize; + rowNum++; + } + + return true; + } else { + return false; + } + } + + public RowWriter getWriter() { + return builder; + } + + public OffHeapRowBlockReader getReader() { + return new OffHeapRowBlockReader(this); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java new file mode 100644 index 0000000..4a9313f --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java @@ -0,0 +1,63 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + OffHeapRowBlock rowBlock; + + // Read States + private int curRowIdxForRead; + private int curPosForRead; + + public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) { + this.rowBlock = rowBlock; + } + + public long remainForRead() { + return rowBlock.memorySize - curPosForRead; + } + + @Override + public boolean next(ZeroCopyTuple tuple) { + if (curRowIdxForRead < rowBlock.rows()) { + + long recordStartPtr = rowBlock.address() + curPosForRead; + int recordLen = UNSAFE.getInt(recordStartPtr); + tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes); + + curPosForRead += recordLen; + curRowIdxForRead++; + + return true; + } else { + return false; + } + } + + @Override + public void reset() { + curPosForRead = 0; + curRowIdxForRead = 0; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java new file mode 100644 index 0000000..dbc3188 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.collect.Lists; +import org.apache.tajo.storage.Tuple; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OffHeapRowBlockUtils { + + public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + List<Tuple> tupleList = Lists.newArrayList(); + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + while(reader.next(zcTuple)) { + tupleList.add(zcTuple); + zcTuple = new ZeroCopyTuple(); + } + Collections.sort(tupleList, comparator); + return tupleList; + } + + public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + Tuple[] tuples = new Tuple[rowBlock.rows()]; + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) { + tuples[i] = zcTuple; + zcTuple = new ZeroCopyTuple(); + } + Arrays.sort(tuples, comparator); + return tuples; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java new file mode 100644 index 0000000..d177e0c --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; + +public class OffHeapRowBlockWriter extends OffHeapRowWriter { + OffHeapRowBlock rowBlock; + + OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) { + super(rowBlock.dataTypes); + this.rowBlock = rowBlock; + } + + public long address() { + return rowBlock.address(); + } + + public int position() { + return rowBlock.position(); + } + + @Override + public void forward(int length) { + rowBlock.position(position() + length); + } + + public void ensureSize(int size) { + rowBlock.ensureSize(size); + } + + @Override + public void endRow() { + super.endRow(); + rowBlock.setRows(rowBlock.rows() + 1); + } + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return rowBlock.dataTypes; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java new file mode 100644 index 0000000..85c7e0b --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; + +/** + * + * Row Record Structure + * + * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N | + * 4 bytes 4 bytes 4 bytes + * + */ +public abstract class OffHeapRowWriter implements RowWriter { + /** record size + offset list */ + private final int headerSize; + /** field offsets */ + private final int [] fieldOffsets; + private final TajoDataTypes.DataType [] dataTypes; + + private int curFieldIdx; + private int curOffset; + + public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) { + this.dataTypes = dataTypes; + fieldOffsets = new int[dataTypes.length]; + headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1); + } + + public void clear() { + curOffset = 0; + curFieldIdx = 0; + } + + public long recordStartAddr() { + return address() + position(); + } + + public abstract long address(); + + public abstract void ensureSize(int size); + + public int offset() { + return curOffset; + } + + /** + * Current position + * + * @return The position + */ + public abstract int position(); + + /** + * Forward the address; + * + * @param length Length to be forwarded + */ + public abstract void forward(int length); + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return dataTypes; + } + + public boolean startRow() { + curOffset = headerSize; + curFieldIdx = 0; + return true; + } + + public void endRow() { + long rowHeaderPos = address() + position(); + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset); + rowHeaderPos += SizeOf.SIZE_OF_INT; + + for (int i = 0; i < curFieldIdx; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + for (int i = curFieldIdx; i < dataTypes.length; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + + // rowOffset is equivalent to a byte length of this row. + forward(curOffset); + } + + public void skipField() { + fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + private void forwardField() { + fieldOffsets[curFieldIdx++] = curOffset; + } + + public void putBool(boolean val) { + ensureSize(SizeOf.SIZE_OF_BOOL); + forwardField(); + + OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00)); + + curOffset += SizeOf.SIZE_OF_BOOL; + } + + public void putInt2(short val) { + ensureSize(SizeOf.SIZE_OF_SHORT); + forwardField(); + + OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_SHORT; + } + + public void putInt4(int val) { + ensureSize(SizeOf.SIZE_OF_INT); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_INT; + } + + public void putInt8(long val) { + ensureSize(SizeOf.SIZE_OF_LONG); + forwardField(); + + OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_LONG; + } + + public void putFloat4(float val) { + ensureSize(SizeOf.SIZE_OF_FLOAT); + forwardField(); + + OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_FLOAT; + } + + public void putFloat8(double val) { + ensureSize(SizeOf.SIZE_OF_DOUBLE); + forwardField(); + + OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_DOUBLE; + } + + public void putText(String val) { + byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET); + putText(bytes); + } + + public void putText(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, + recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putBlob(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, + recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putTimestamp(long val) { + putInt8(val); + } + + public void putDate(int val) { + putInt4(val); + } + + public void putTime(long val) { + putInt8(val); + } + + public void putInterval(IntervalDatum val) { + ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); + forwardField(); + + long offset = recordStartAddr() + curOffset; + OffHeapMemory.UNSAFE.putInt(offset, val.getMonths()); + offset += SizeOf.SIZE_OF_INT; + OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds()); + curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG; + } + + public void putInet4(int val) { + putInt4(val); + } + + public void putProtoDatum(ProtobufDatum val) { + putBlob(val.asByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java new file mode 100644 index 0000000..14e67b2 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.FileUtil; + +/** + * It specifies the maximum size or increasing ratio. In addition, + * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31 + * due to ByteBuffer. + */ +public class ResizableLimitSpec { + private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); + + public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; + public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); + + private final long initSize; + private final long limitBytes; + private final float incRatio; + private final float allowedOVerflowRatio; + private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f; + private final static float DEFAULT_INCREASE_RATIO = 1.0f; + + public ResizableLimitSpec(long initSize) { + this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes) { + this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) { + this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) { + Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes."); + Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB."); + Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes."); + Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB."); + Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0."); + + if (initSize == limitBytes) { + long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio)); + + if (overflowedSize > Integer.MAX_VALUE) { + overflowedSize = Integer.MAX_VALUE; + } + + this.initSize = overflowedSize; + this.limitBytes = overflowedSize; + } else { + this.initSize = initSize; + limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio)); + + if (limitBytes > Integer.MAX_VALUE) { + this.limitBytes = Integer.MAX_VALUE; + } else { + this.limitBytes = limitBytes; + } + } + + this.allowedOVerflowRatio = allowedOverflowRatio; + this.incRatio = incRatio; + } + + public long initialSize() { + return initSize; + } + + public long limit() { + return limitBytes; + } + + public float remainRatio(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + if (currentSize > Integer.MAX_VALUE) { + currentSize = Integer.MAX_VALUE; + } + return (float)currentSize / (float)limitBytes; + } + + public boolean canIncrease(long currentSize) { + return remain(currentSize) > 0; + } + + public long remain(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; + } + + public int increasedSize(int currentSize) { + if (currentSize < initSize) { + return (int) initSize; + } + + if (currentSize > Integer.MAX_VALUE) { + LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); + return Integer.MAX_VALUE; + } + long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); + + if (nextSize > limitBytes) { + LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")"); + nextSize = limitBytes; + } + + if (nextSize > Integer.MAX_VALUE) { + LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")"); + nextSize = Integer.MAX_VALUE; + } + + return (int) nextSize; + } + + @Override + public String toString() { + return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit=" + + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio + + ",inc_ratio=" + incRatio; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java new file mode 100644 index 0000000..a2b2561 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java @@ -0,0 +1,73 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; + +/** + * The call sequence should be as follows: + * + * <pre> + * startRow() --> skipField() or putXXX --> endRow() + * </pre> + * + * The total number of skipField and putXXX invocations must be equivalent to the number of fields. + */ +public interface RowWriter { + + public TajoDataTypes.DataType [] dataTypes(); + + public boolean startRow(); + + public void endRow(); + + public void skipField(); + + public void putBool(boolean val); + + public void putInt2(short val); + + public void putInt4(int val); + + public void putInt8(long val); + + public void putFloat4(float val); + + public void putFloat8(double val); + + public void putText(String val); + + public void putText(byte[] val); + + public void putBlob(byte[] val); + + public void putTimestamp(long val); + + public void putTime(long val); + + public void putDate(int val); + + public void putInterval(IntervalDatum val); + + public void putInet4(int val); + + public void putProtoDatum(ProtobufDatum datum); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java new file mode 100644 index 0000000..138386f --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -0,0 +1,308 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.sun.tools.javac.util.Convert; +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public abstract class UnSafeTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + private DirectBuffer bb; + private int relativePos; + private int length; + private DataType [] types; + + protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + this.bb = (DirectBuffer) bb; + this.relativePos = relativePos; + this.length = length; + this.types = types; + } + + void set(ByteBuffer bb, DataType [] types) { + set(bb, 0, bb.limit(), types); + } + + @Override + public int size() { + return types.length; + } + + public ByteBuffer nioBuffer() { + return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); + } + + public HeapTuple toHeapTuple() { + byte [] bytes = new byte[length]; + UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length); + return new HeapTuple(bytes, types); + } + + public void copyFrom(UnSafeTuple tuple) { + Preconditions.checkNotNull(tuple); + + ((ByteBuffer) bb).clear(); + if (length < tuple.length) { + UnsafeUtil.free((ByteBuffer) bb); + bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder()); + this.relativePos = 0; + this.length = tuple.length; + } + + ((ByteBuffer) bb).put(tuple.nioBuffer()); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + public long getFieldAddr(int fieldId) { + int fieldOffset = getFieldOffset(fieldId); + if (fieldOffset == -1) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return bb.address() + relativePos + fieldOffset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(getFieldAddr(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + long addr = getFieldAddr(fieldId); + return UNSAFE.getShort(addr); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(getFieldAddr(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(getFieldAddr(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(getFieldAddr(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(getFieldAddr(fieldId)); + } + + @Override + public String getText(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return new String(bytes); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = getFieldAddr(fieldId); + int months = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return Convert.utf2chars(bytes); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return toHeapTuple(); + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } + + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java new file mode 100644 index 0000000..73e1e2f --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java @@ -0,0 +1,99 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedLongs; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +import java.nio.ByteOrder; + +/** + * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator. + */ +public class UnSafeTupleBytesComparator { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + static final boolean littleEndian = + ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); + + public static int compare(long ptr1, long ptr2) { + int lstrLen = UNSAFE.getInt(ptr1); + int rstrLen = UNSAFE.getInt(ptr2); + + ptr1 += SizeOf.SIZE_OF_INT; + ptr2 += SizeOf.SIZE_OF_INT; + + int minLength = Math.min(lstrLen, rstrLen); + int minWords = minLength / Longs.BYTES; + + /* + * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a + * time is no slower than comparing 4 bytes at a time even on 32-bit. + * On the other hand, it is substantially faster on 64-bit. + */ + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = UNSAFE.getLong(ptr1); + long rw = UNSAFE.getLong(ptr2); + long diff = lw ^ rw; + + if (diff != 0) { + if (!littleEndian) { + return UnsignedLongs.compare(lw, rw); + } + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + if (x == 0) { + x = (int) (diff >>> 32); + n = 32; + } + + y = x << 16; + if (y == 0) { + n += 16; + } else { + x = y; + } + + y = x << 8; + if (y == 0) { + n += 8; + } + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + + ptr1 += SizeOf.SIZE_OF_LONG; + ptr2 += SizeOf.SIZE_OF_LONG; + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++); + if (result != 0) { + return result; + } + } + return lstrLen - rstrLen; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java new file mode 100644 index 0000000..51dbb29 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import java.nio.ByteBuffer; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class ZeroCopyTuple extends UnSafeTuple { + + public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + super.set(bb, relativePos, length, types); + } + + @Override + public void release() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/proto/IndexProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto index bcb0cbe..f5c8a08 100644 --- a/tajo-storage/src/main/proto/IndexProtos.proto +++ b/tajo-storage/src/main/proto/IndexProtos.proto @@ -25,5 +25,7 @@ option java_generate_equals_and_hash = true; import "CatalogProtos.proto"; message TupleComparatorProto { - repeated TupleComparatorSpecProto compSpecs = 1; + required SchemaProto schema = 1; + repeated SortSpecProto sortSpecs = 2; + repeated TupleComparatorSpecProto compSpecs = 3; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java index ab7c2b2..639ca04 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java @@ -69,7 +69,7 @@ public class TestTupleComparator { SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false); SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false); - TupleComparator tc = new TupleComparator(schema, + BaseTupleComparator tc = new BaseTupleComparator(schema, new SortSpec[] {sortKey1, sortKey2}); assertEquals(-1, tc.compare(tuple1, tuple2)); assertEquals(1, tc.compare(tuple2, tuple1));
