http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java new file mode 100644 index 0000000..a2c08de --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; + +/** + * This class is not thread-safe. + */ +public class TableStatistics { + private static final Log LOG = LogFactory.getLog(TableStatistics.class); + private Schema schema; + private Tuple minValues; + private Tuple maxValues; + private long [] numNulls; + private long numRows = 0; + private long numBytes = 0; + + private boolean [] comparable; + + public TableStatistics(Schema schema) { + this.schema = schema; + minValues = new VTuple(schema.size()); + maxValues = new VTuple(schema.size()); + + numNulls = new long[schema.size()]; + comparable = new boolean[schema.size()]; + + DataType type; + for (int i = 0; i < schema.size(); i++) { + type = schema.getColumn(i).getDataType(); + if (type.getType() == Type.PROTOBUF) { + comparable[i] = false; + } else { + comparable[i] = true; + } + } + } + + public Schema getSchema() { + return this.schema; + } + + public void incrementRow() { + numRows++; + } + + public long getNumRows() { + return this.numRows; + } + + public void setNumBytes(long bytes) { + this.numBytes = bytes; + } + + public long getNumBytes() { + return this.numBytes; + } + + public void analyzeField(int idx, Datum datum) { + if (datum instanceof NullDatum) { + numNulls[idx]++; + return; + } + + if (comparable[idx]) { + if (!maxValues.contains(idx) || + maxValues.get(idx).compareTo(datum) < 0) { + maxValues.put(idx, datum); + } + if (!minValues.contains(idx) || + minValues.get(idx).compareTo(datum) > 0) { + minValues.put(idx, datum); + } + } + } + + public TableStats getTableStat() { + TableStats stat = new TableStats(); + + ColumnStats columnStats; + for (int i = 0; i < schema.size(); i++) { + columnStats = new ColumnStats(schema.getColumn(i)); + columnStats.setNumNulls(numNulls[i]); + if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { + columnStats.setMinValue(minValues.get(i)); + } else { + LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); + } + if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { + columnStats.setMaxValue(maxValues.get(i)); + } else { + LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); + } + stat.addColumnStat(columnStats); + } + + stat.setNumRows(this.numRows); + stat.setNumBytes(this.numBytes); + + return stat; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java new file mode 100644 index 0000000..094d285 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.protobuf.Message; +import org.apache.commons.codec.binary.Base64; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.*; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; +import java.io.OutputStream; + +//Compatibility with Apache Hive +@Deprecated +public class TextSerializerDeserializer implements SerializerDeserializer { + public static final byte[] trueBytes = "true".getBytes(); + public static final byte[] falseBytes = "false".getBytes(); + private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + + + @Override + public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException { + + byte[] bytes; + int length = 0; + TajoDataTypes.DataType dataType = col.getDataType(); + + if (datum == null || datum instanceof NullDatum) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + length = nullCharacters.length; + out.write(nullCharacters); + break; + default: + break; + } + return length; + } + + switch (dataType.getType()) { + case BOOLEAN: + out.write(datum.asBool() ? trueBytes : falseBytes); + length = trueBytes.length; + break; + case CHAR: + byte[] pad = new byte[dataType.getLength() - datum.size()]; + bytes = datum.asTextBytes(); + out.write(bytes); + out.write(pad); + length = bytes.length + pad.length; + break; + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case INET4: + case DATE: + case INTERVAL: + bytes = datum.asTextBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIME: + bytes = ((TimeDatum)datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIMESTAMP: + bytes = ((TimestampDatum)datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case INET6: + case BLOB: + bytes = Base64.encodeBase64(datum.asByteArray(), false); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) datum; + byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); + length = protoBytes.length; + out.write(protoBytes, 0, protoBytes.length); + break; + case NULL_TYPE: + default: + break; + } + return length; + } + + @Override + public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + + Datum datum; + switch (col.getDataType().getType()) { + case BOOLEAN: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T'); + break; + case BIT: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length))); + break; + case CHAR: + datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createChar(new String(bytes, offset, length).trim()); + break; + case INT1: + case INT2: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, offset, length)); + break; + case INT4: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInt4(NumberUtil.parseInt(bytes, offset, length)); + break; + case INT8: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInt8(new String(bytes, offset, length)); + break; + case FLOAT4: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createFloat4(new String(bytes, offset, length)); + break; + case FLOAT8: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, offset, length)); + break; + case TEXT: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createText(chars); + break; + } + case DATE: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createDate(new String(bytes, offset, length)); + break; + case TIME: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createTime(new String(bytes, offset, length)); + break; + case TIMESTAMP: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createTimestamp(new String(bytes, offset, length)); + break; + case INTERVAL: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInterval(new String(bytes, offset, length)); + break; + case PROTOBUF: { + if (isNull(bytes, offset, length, nullCharacters)) { + datum = NullDatum.get(); + } else { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] protoBytes = new byte[length]; + System.arraycopy(bytes, offset, protoBytes, 0, length); + protobufJsonFormat.merge(protoBytes, builder); + datum = factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + break; + } + case INET4: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInet4(new String(bytes, offset, length)); + break; + case BLOB: { + if (isNull(bytes, offset, length, nullCharacters)) { + datum = NullDatum.get(); + } else { + byte[] blob = new byte[length]; + System.arraycopy(bytes, offset, blob, 0, length); + datum = DatumFactory.createBlob(Base64.decodeBase64(blob)); + } + break; + } + default: + datum = NullDatum.get(); + break; + } + return datum; + } + + private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) { + return length == 0 || ((length == nullBytes.length) + && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length)); + } + + private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) { + return length > 0 && length == nullBytes.length + && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java new file mode 100644 index 0000000..8dffd8d --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java @@ -0,0 +1,32 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.common.ProtoObject; + +import java.util.Comparator; + +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +public abstract class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> { + + public abstract int compare(Tuple o1, Tuple o2); + + public abstract boolean isAscendingFirstKey(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java new file mode 100644 index 0000000..e824b99 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; + +import java.util.Comparator; + +/** + * It represents a pair of start and end tuples. + */ +public class TupleRange implements Comparable<TupleRange>, Cloneable { + private Tuple start; + private Tuple end; + private final TupleComparator comp; + + public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) { + this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); + // if there is only one value, start == end + this.start = start; + this.end = end; + } + + public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) { + Schema schema = new Schema(); + for (SortSpec spec : sortSpecs) { + schema.addColumn(spec.getSortKey()); + } + + return schema; + } + + public void setStart(Tuple tuple) { + this.start = tuple; + } + + public final Tuple getStart() { + return this.start; + } + + public void setEnd(Tuple tuple) { + this.end = tuple; + } + + public final Tuple getEnd() { + return this.end; + } + + public String toString() { + return "[" + this.start + ", " + this.end + ")"; + } + + @Override + public int hashCode() { + return Objects.hashCode(start, end); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TupleRange) { + TupleRange other = (TupleRange) obj; + return this.start.equals(other.start) && this.end.equals(other.end); + } else { + return false; + } + } + + @Override + public int compareTo(TupleRange o) { + // TODO - should handle overlap + int cmpVal = comp.compare(this.start, o.start); + if (cmpVal != 0) { + return cmpVal; + } else { + return comp.compare(this.end, o.end); + } + } + + public static class DescendingTupleRangeComparator + implements Comparator<TupleRange> { + + @Override + public int compare(TupleRange left, TupleRange right) { + return right.compareTo(left); + } + } + + public TupleRange clone() throws CloneNotSupportedException { + TupleRange newRange = (TupleRange) super.clone(); + newRange.setStart(start.clone()); + newRange.setEnd(end.clone()); + return newRange; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java new file mode 100644 index 0000000..ad19101 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ForSplitableStore { +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java new file mode 100644 index 0000000..baeda8c --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.compress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DoNotPool; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A global compressor/decompressor pool used to save and reuse (possibly + * native) compression/decompression codecs. + */ +public final class CodecPool { + private static final Log LOG = LogFactory.getLog(CodecPool.class); + + /** + * A global compressor pool used to save the expensive + * construction/destruction of (possibly native) decompression codecs. + */ + private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL = + new HashMap<Class<Compressor>, List<Compressor>>(); + + /** + * A global decompressor pool used to save the expensive + * construction/destruction of (possibly native) decompression codecs. + */ + private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL = + new HashMap<Class<Decompressor>, List<Decompressor>>(); + + private static <T> T borrow(Map<Class<T>, List<T>> pool, + Class<? extends T> codecClass) { + T codec = null; + + // Check if an appropriate codec is available + synchronized (pool) { + if (pool.containsKey(codecClass)) { + List<T> codecList = pool.get(codecClass); + + if (codecList != null) { + synchronized (codecList) { + if (!codecList.isEmpty()) { + codec = codecList.remove(codecList.size() - 1); + } + } + } + } + } + + return codec; + } + + private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) { + if (codec != null) { + Class<T> codecClass = (Class<T>) codec.getClass(); + synchronized (pool) { + if (!pool.containsKey(codecClass)) { + pool.put(codecClass, new ArrayList<T>()); + } + + List<T> codecList = pool.get(codecClass); + synchronized (codecList) { + codecList.add(codec); + } + } + } + } + + /** + * Get a {@link Compressor} for the given {@link CompressionCodec} from the + * pool or a new one. + * + * @param codec + * the <code>CompressionCodec</code> for which to get the + * <code>Compressor</code> + * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor + * @return <code>Compressor</code> for the given <code>CompressionCodec</code> + * from the pool or a new one + */ + public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { + Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType()); + if (compressor == null) { + compressor = codec.createCompressor(); + LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); + } else { + compressor.reinit(conf); + if(LOG.isDebugEnabled()) { + LOG.debug("Got recycled compressor"); + } + } + return compressor; + } + + public static Compressor getCompressor(CompressionCodec codec) { + return getCompressor(codec, null); + } + + /** + * Get a {@link Decompressor} for the given {@link CompressionCodec} from the + * pool or a new one. + * + * @param codec + * the <code>CompressionCodec</code> for which to get the + * <code>Decompressor</code> + * @return <code>Decompressor</code> for the given + * <code>CompressionCodec</code> the pool or a new one + */ + public static Decompressor getDecompressor(CompressionCodec codec) { + Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec + .getDecompressorType()); + if (decompressor == null) { + decompressor = codec.createDecompressor(); + LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("Got recycled decompressor"); + } + } + return decompressor; + } + + /** + * Return the {@link Compressor} to the pool. + * + * @param compressor + * the <code>Compressor</code> to be returned to the pool + */ + public static void returnCompressor(Compressor compressor) { + if (compressor == null) { + return; + } + // if the compressor can't be reused, don't pool it. + if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { + return; + } + compressor.reset(); + payback(COMPRESSOR_POOL, compressor); + } + + /** + * Return the {@link Decompressor} to the pool. + * + * @param decompressor + * the <code>Decompressor</code> to be returned to the pool + */ + public static void returnDecompressor(Decompressor decompressor) { + if (decompressor == null) { + return; + } + // if the decompressor can't be reused, don't pool it. + if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { + return; + } + decompressor.reset(); + payback(DECOMPRESSOR_POOL, decompressor); + } + + private CodecPool() { + // prevent instantiation + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java new file mode 100644 index 0000000..bb035a8 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage.exception; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public class AlreadyExistsStorageException extends IOException { + private static final long serialVersionUID = 965518916144019032L; + + + public AlreadyExistsStorageException(String path) { + super("Error: "+path+" alreay exists"); + } + + public AlreadyExistsStorageException(Path path) { + this(path.toString()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java new file mode 100644 index 0000000..a67d1f7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.exception; + +public class UnknownCodecException extends Exception { + + private static final long serialVersionUID = 4287230843540404529L; + + public UnknownCodecException() { + + } + + public UnknownCodecException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java new file mode 100644 index 0000000..d18b5a0 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.exception; + +public class UnknownDataTypeException extends Exception { + + private static final long serialVersionUID = -2630390595968966164L; + + public UnknownDataTypeException() { + + } + + public UnknownDataTypeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java new file mode 100644 index 0000000..8b197d6 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage.exception; + +public class UnsupportedFileTypeException extends RuntimeException { + private static final long serialVersionUID = -8160289695849000342L; + + public UnsupportedFileTypeException() { + } + + /** + * @param message + */ + public UnsupportedFileTypeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java new file mode 100644 index 0000000..ac43197 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import org.apache.tajo.common.ProtoObject; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +public interface Fragment extends ProtoObject<FragmentProto> { + + public abstract String getTableName(); + + @Override + public abstract FragmentProto getProto(); + + public abstract long getLength(); + + public abstract String getKey(); + + public String[] getHosts(); + + public abstract boolean isEmpty(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java new file mode 100644 index 0000000..07720c7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.annotation.ThreadSafe; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; +import java.util.Map; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +@ThreadSafe +public class FragmentConvertor { + /** + * Cache of fragment classes + */ + protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap(); + /** + * Cache of constructors for each class. + */ + private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); + /** + * default parameter for all constructors + */ + private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class }; + + public static Class<? extends Fragment> getFragmentClass(Configuration conf, String storeType) + throws IOException { + Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase()); + if (fragmentClass == null) { + fragmentClass = conf.getClass( + String.format("tajo.storage.fragment.%s.class", storeType.toLowerCase()), null, Fragment.class); + CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass); + } + + if (fragmentClass == null) { + throw new IOException("No such a fragment for " + storeType.toLowerCase()); + } + + return fragmentClass; + } + + public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) { + T result; + try { + Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); + if (constructor == null) { + constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(clazz, constructor); + } + result = constructor.newInstance(new Object[]{fragment.getContents()}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment) + throws IOException { + Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getStoreType().toLowerCase()); + if (fragmentClass == null) { + throw new IOException("No such a fragment class for " + fragment.getStoreType()); + } + return convert(fragmentClass, fragment); + } + + public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments) + throws IOException { + List<T> list = Lists.newArrayList(); + if (fragments == null) { + return list; + } + for (FragmentProto proto : fragments) { + list.add(convert(clazz, proto)); + } + return list; + } + + public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) throws IOException { + List<T> list = Lists.newArrayList(); + if (fragments == null) { + return list; + } + for (FragmentProto proto : fragments) { + list.add((T) convert(conf, proto)); + } + return list; + } + + public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) { + List<FragmentProto> list = Lists.newArrayList(); + if (fragments == null) { + return list; + } + for (Fragment fragment : fragments) { + list.add(fragment.getProto()); + } + return list; + } + + public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) { + List<FragmentProto> list = toFragmentProtoList(fragments); + return list.toArray(new FragmentProto[list.size()]); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java new file mode 100644 index 0000000..c1835df --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -0,0 +1,112 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.HeapTuple; +import org.apache.tajo.tuple.offheap.OffHeapRowWriter; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { + private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class); + + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + // buffer + private ByteBuffer buffer; + private long address; + + public BaseTupleBuilder(Schema schema) { + super(SchemaUtil.toDataTypes(schema)); + buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder()); + address = UnsafeUtil.getAddress(buffer); + } + + @Override + public long address() { + return address; + } + + public void ensureSize(int size) { + if (buffer.remaining() - size < 0) { // check the remain size + // enlarge new buffer and copy writing data + int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + UNSAFE.copyMemory(this.address, newAddress, buffer.limit()); + LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + + // release existing buffer and replace variables + UnsafeUtil.free(buffer); + buffer = newByteBuf; + address = newAddress; + } + } + + @Override + public int position() { + return 0; + } + + @Override + public void forward(int length) { + } + + @Override + public void endRow() { + super.endRow(); + buffer.position(0).limit(offset()); + } + + @Override + public Tuple build() { + return buildToHeapTuple(); + } + + public HeapTuple buildToHeapTuple() { + byte [] bytes = new byte[buffer.limit()]; + UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit()); + return new HeapTuple(bytes, dataTypes()); + } + + public ZeroCopyTuple buildToZeroCopyTuple() { + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + zcTuple.set(buffer, 0, buffer.limit(), dataTypes()); + return zcTuple; + } + + public void release() { + UnsafeUtil.free(buffer); + buffer = null; + address = 0; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java new file mode 100644 index 0000000..be734e1 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; + +public interface RowBlockReader<T extends Tuple> { + + /** + * Return for each tuple + * + * @return True if tuple block is filled with tuples. Otherwise, It will return false. + */ + public boolean next(T tuple); + + public void reset(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java new file mode 100644 index 0000000..c43c018 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java @@ -0,0 +1,26 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.RowWriter; + +public interface TupleBuilder extends RowWriter { + public Tuple build(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java new file mode 100644 index 0000000..9662d5a --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.UnsafeUtil; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class DirectBufTuple extends UnSafeTuple implements Deallocatable { + private ByteBuffer bb; + + public DirectBufTuple(int length, DataType[] types) { + bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder()); + set(bb, 0, length, types); + } + + @Override + public void release() { + UnsafeUtil.free(bb); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java new file mode 100644 index 0000000..a327123 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +/** + * Fixed size limit specification + */ +public class FixedSizeLimitSpec extends ResizableLimitSpec { + public FixedSizeLimitSpec(long size) { + super(size, size); + } + + public FixedSizeLimitSpec(long size, float allowedOverflowRatio) { + super(size, size, allowedOverflowRatio); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java new file mode 100644 index 0000000..33f9f1c --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java @@ -0,0 +1,272 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.UnsafeUtil; + +import sun.misc.Unsafe; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class HeapTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET; + + private final byte [] data; + private final DataType [] types; + + public HeapTuple(final byte [] bytes, final DataType [] types) { + this.data = bytes; + this.types = types; + } + + @Override + public int size() { + return data.length; + } + + public ByteBuffer nioBuffer() { + return ByteBuffer.wrap(data); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + private int checkNullAndGetOffset(int fieldId) { + int offset = getFieldOffset(fieldId); + if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return offset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + + @Override + public String getText(int fieldId) { + return new String(getBytes(fieldId)); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int months = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = checkNullAndGetOffset(fieldId); + int len = UNSAFE.getInt(data, BASE_OFFSET + pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return this; + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java new file mode 100644 index 0000000..2f8e349 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapMemory.class); + + protected static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + protected ByteBuffer buffer; + protected int memorySize; + protected ResizableLimitSpec limitSpec; + protected long address; + + @VisibleForTesting + protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) { + this.buffer = buffer; + this.address = ((DirectBuffer) buffer).address(); + this.memorySize = buffer.limit(); + this.limitSpec = limitSpec; + } + + public OffHeapMemory(ResizableLimitSpec limitSpec) { + this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec); + } + + public long address() { + return address; + } + + public long size() { + return memorySize; + } + + public void resize(int newSize) { + Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); + + if (newSize > limitSpec.limit()) { + throw new RuntimeException("Resize cannot exceed the size limit"); + } + + if (newSize < memorySize) { + LOG.warn("The size reduction is ignored."); + } + + int newBlockSize = UnsafeUtil.alignedSize(newSize); + ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); + long newAddress = ((DirectBuffer)newByteBuf).address(); + + UNSAFE.copyMemory(this.address, newAddress, memorySize); + + UnsafeUtil.free(buffer); + this.memorySize = newSize; + this.buffer = newByteBuf; + this.address = newAddress; + } + + public java.nio.Buffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(memorySize); + } + + @Override + public void release() { + UnsafeUtil.free(this.buffer); + this.buffer = null; + this.address = 0; + this.memorySize = 0; + } + + public String toString() { + return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java new file mode 100644 index 0000000..689efb7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java @@ -0,0 +1,176 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.SizeOf; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable { + private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class); + + public static final int NULL_FIELD_OFFSET = -1; + + DataType [] dataTypes; + + // Basic States + private int maxRowNum = Integer.MAX_VALUE; // optional + private int rowNum; + protected int position = 0; + + private OffHeapRowBlockWriter builder; + + private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) { + super(buffer, limitSpec); + initialize(schema); + } + + public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) { + super(limitSpec); + initialize(schema); + } + + private void initialize(Schema schema) { + dataTypes = SchemaUtil.toDataTypes(schema); + + this.builder = new OffHeapRowBlockWriter(this); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, int bytes) { + this(schema, new ResizableLimitSpec(bytes)); + } + + @VisibleForTesting + public OffHeapRowBlock(Schema schema, ByteBuffer buffer) { + this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT); + } + + public void position(int pos) { + this.position = pos; + } + + public void clear() { + this.position = 0; + this.rowNum = 0; + + builder.clear(); + } + + @Override + public ByteBuffer nioBuffer() { + return (ByteBuffer) buffer.position(0).limit(position); + } + + public int position() { + return position; + } + + public long usedMem() { + return position; + } + + /** + * Ensure that this buffer has enough remaining space to add the size. + * Creates and copies to a new buffer if necessary + * + * @param size Size to add + */ + public void ensureSize(int size) { + if (remain() - size < 0) { + if (!limitSpec.canIncrease(memorySize)) { + throw new RuntimeException("Cannot increase RowBlock anymore."); + } + + int newBlockSize = limitSpec.increasedSize(memorySize); + resize(newBlockSize); + LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); + } + } + + public long remain() { + return memorySize - position - builder.offset(); + } + + public int maxRowNum() { + return maxRowNum; + } + public int rows() { + return rowNum; + } + + public void setRows(int rowNum) { + this.rowNum = rowNum; + } + + public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException { + if (channel.position() < channel.size()) { + clear(); + + buffer.clear(); + channel.read(buffer); + memorySize = buffer.position(); + + while (position < memorySize) { + long recordPtr = address + position; + + if (remain() < SizeOf.SIZE_OF_INT) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + int recordSize = UNSAFE.getInt(recordPtr); + + if (remain() < recordSize) { + channel.position(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + position += recordSize; + rowNum++; + } + + return true; + } else { + return false; + } + } + + public RowWriter getWriter() { + return builder; + } + + public OffHeapRowBlockReader getReader() { + return new OffHeapRowBlockReader(this); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java new file mode 100644 index 0000000..4a9313f --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java @@ -0,0 +1,63 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + OffHeapRowBlock rowBlock; + + // Read States + private int curRowIdxForRead; + private int curPosForRead; + + public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) { + this.rowBlock = rowBlock; + } + + public long remainForRead() { + return rowBlock.memorySize - curPosForRead; + } + + @Override + public boolean next(ZeroCopyTuple tuple) { + if (curRowIdxForRead < rowBlock.rows()) { + + long recordStartPtr = rowBlock.address() + curPosForRead; + int recordLen = UNSAFE.getInt(recordStartPtr); + tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes); + + curPosForRead += recordLen; + curRowIdxForRead++; + + return true; + } else { + return false; + } + } + + @Override + public void reset() { + curPosForRead = 0; + curRowIdxForRead = 0; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java new file mode 100644 index 0000000..dbc3188 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.collect.Lists; +import org.apache.tajo.storage.Tuple; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OffHeapRowBlockUtils { + + public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + List<Tuple> tupleList = Lists.newArrayList(); + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + while(reader.next(zcTuple)) { + tupleList.add(zcTuple); + zcTuple = new ZeroCopyTuple(); + } + Collections.sort(tupleList, comparator); + return tupleList; + } + + public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { + Tuple[] tuples = new Tuple[rowBlock.rows()]; + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) { + tuples[i] = zcTuple; + zcTuple = new ZeroCopyTuple(); + } + Arrays.sort(tuples, comparator); + return tuples; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java new file mode 100644 index 0000000..d177e0c --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; + +public class OffHeapRowBlockWriter extends OffHeapRowWriter { + OffHeapRowBlock rowBlock; + + OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) { + super(rowBlock.dataTypes); + this.rowBlock = rowBlock; + } + + public long address() { + return rowBlock.address(); + } + + public int position() { + return rowBlock.position(); + } + + @Override + public void forward(int length) { + rowBlock.position(position() + length); + } + + public void ensureSize(int size) { + rowBlock.ensureSize(size); + } + + @Override + public void endRow() { + super.endRow(); + rowBlock.setRows(rowBlock.rows() + 1); + } + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return rowBlock.dataTypes; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java new file mode 100644 index 0000000..85c7e0b --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; + +/** + * + * Row Record Structure + * + * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N | + * 4 bytes 4 bytes 4 bytes + * + */ +public abstract class OffHeapRowWriter implements RowWriter { + /** record size + offset list */ + private final int headerSize; + /** field offsets */ + private final int [] fieldOffsets; + private final TajoDataTypes.DataType [] dataTypes; + + private int curFieldIdx; + private int curOffset; + + public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) { + this.dataTypes = dataTypes; + fieldOffsets = new int[dataTypes.length]; + headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1); + } + + public void clear() { + curOffset = 0; + curFieldIdx = 0; + } + + public long recordStartAddr() { + return address() + position(); + } + + public abstract long address(); + + public abstract void ensureSize(int size); + + public int offset() { + return curOffset; + } + + /** + * Current position + * + * @return The position + */ + public abstract int position(); + + /** + * Forward the address; + * + * @param length Length to be forwarded + */ + public abstract void forward(int length); + + @Override + public TajoDataTypes.DataType[] dataTypes() { + return dataTypes; + } + + public boolean startRow() { + curOffset = headerSize; + curFieldIdx = 0; + return true; + } + + public void endRow() { + long rowHeaderPos = address() + position(); + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset); + rowHeaderPos += SizeOf.SIZE_OF_INT; + + for (int i = 0; i < curFieldIdx; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + for (int i = curFieldIdx; i < dataTypes.length; i++) { + OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET); + rowHeaderPos += SizeOf.SIZE_OF_INT; + } + + // rowOffset is equivalent to a byte length of this row. + forward(curOffset); + } + + public void skipField() { + fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + private void forwardField() { + fieldOffsets[curFieldIdx++] = curOffset; + } + + public void putBool(boolean val) { + ensureSize(SizeOf.SIZE_OF_BOOL); + forwardField(); + + OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00)); + + curOffset += SizeOf.SIZE_OF_BOOL; + } + + public void putInt2(short val) { + ensureSize(SizeOf.SIZE_OF_SHORT); + forwardField(); + + OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_SHORT; + } + + public void putInt4(int val) { + ensureSize(SizeOf.SIZE_OF_INT); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_INT; + } + + public void putInt8(long val) { + ensureSize(SizeOf.SIZE_OF_LONG); + forwardField(); + + OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_LONG; + } + + public void putFloat4(float val) { + ensureSize(SizeOf.SIZE_OF_FLOAT); + forwardField(); + + OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_FLOAT; + } + + public void putFloat8(double val) { + ensureSize(SizeOf.SIZE_OF_DOUBLE); + forwardField(); + + OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val); + curOffset += SizeOf.SIZE_OF_DOUBLE; + } + + public void putText(String val) { + byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET); + putText(bytes); + } + + public void putText(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, + recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putBlob(byte[] val) { + int bytesLen = val.length; + + ensureSize(SizeOf.SIZE_OF_INT + bytesLen); + forwardField(); + + OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); + curOffset += SizeOf.SIZE_OF_INT; + + OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, + recordStartAddr() + curOffset, bytesLen); + curOffset += bytesLen; + } + + public void putTimestamp(long val) { + putInt8(val); + } + + public void putDate(int val) { + putInt4(val); + } + + public void putTime(long val) { + putInt8(val); + } + + public void putInterval(IntervalDatum val) { + ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); + forwardField(); + + long offset = recordStartAddr() + curOffset; + OffHeapMemory.UNSAFE.putInt(offset, val.getMonths()); + offset += SizeOf.SIZE_OF_INT; + OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds()); + curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG; + } + + public void putInet4(int val) { + putInt4(val); + } + + public void putProtoDatum(ProtobufDatum val) { + putBlob(val.asByteArray()); + } +}
