http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java deleted file mode 100644 index 4a9313f..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java +++ /dev/null @@ -1,63 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.tuple.RowBlockReader; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; - -public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - OffHeapRowBlock rowBlock; - - // Read States - private int curRowIdxForRead; - private int curPosForRead; - - public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) { - this.rowBlock = rowBlock; - } - - public long remainForRead() { - return rowBlock.memorySize - curPosForRead; - } - - @Override - public boolean next(ZeroCopyTuple tuple) { - if (curRowIdxForRead < rowBlock.rows()) { - - long recordStartPtr = rowBlock.address() + curPosForRead; - int recordLen = UNSAFE.getInt(recordStartPtr); - tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes); - - curPosForRead += recordLen; - curRowIdxForRead++; - - return true; - } else { - return false; - } - } - - @Override - public void reset() { - curPosForRead = 0; - curRowIdxForRead = 0; - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java deleted file mode 100644 index dbc3188..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.collect.Lists; -import org.apache.tajo.storage.Tuple; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -public class OffHeapRowBlockUtils { - - public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { - List<Tuple> tupleList = Lists.newArrayList(); - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - while(reader.next(zcTuple)) { - tupleList.add(zcTuple); - zcTuple = new ZeroCopyTuple(); - } - Collections.sort(tupleList, comparator); - return tupleList; - } - - public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { - Tuple[] tuples = new Tuple[rowBlock.rows()]; - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) { - tuples[i] = zcTuple; - zcTuple = new ZeroCopyTuple(); - } - Arrays.sort(tuples, comparator); - return tuples; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java deleted file mode 100644 index d177e0c..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.common.TajoDataTypes; - -public class OffHeapRowBlockWriter extends OffHeapRowWriter { - OffHeapRowBlock rowBlock; - - OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) { - super(rowBlock.dataTypes); - this.rowBlock = rowBlock; - } - - public long address() { - return rowBlock.address(); - } - - public int position() { - return rowBlock.position(); - } - - @Override - public void forward(int length) { - rowBlock.position(position() + length); - } - - public void ensureSize(int size) { - rowBlock.ensureSize(size); - } - - @Override - public void endRow() { - super.endRow(); - rowBlock.setRows(rowBlock.rows() + 1); - } - - @Override - public TajoDataTypes.DataType[] dataTypes() { - return rowBlock.dataTypes; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java deleted file mode 100644 index 85c7e0b..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.UnsafeUtil; - -/** - * - * Row Record Structure - * - * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N | - * 4 bytes 4 bytes 4 bytes - * - */ -public abstract class OffHeapRowWriter implements RowWriter { - /** record size + offset list */ - private final int headerSize; - /** field offsets */ - private final int [] fieldOffsets; - private final TajoDataTypes.DataType [] dataTypes; - - private int curFieldIdx; - private int curOffset; - - public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) { - this.dataTypes = dataTypes; - fieldOffsets = new int[dataTypes.length]; - headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1); - } - - public void clear() { - curOffset = 0; - curFieldIdx = 0; - } - - public long recordStartAddr() { - return address() + position(); - } - - public abstract long address(); - - public abstract void ensureSize(int size); - - public int offset() { - return curOffset; - } - - /** - * Current position - * - * @return The position - */ - public abstract int position(); - - /** - * Forward the address; - * - * @param length Length to be forwarded - */ - public abstract void forward(int length); - - @Override - public TajoDataTypes.DataType[] dataTypes() { - return dataTypes; - } - - public boolean startRow() { - curOffset = headerSize; - curFieldIdx = 0; - return true; - } - - public void endRow() { - long rowHeaderPos = address() + position(); - OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset); - rowHeaderPos += SizeOf.SIZE_OF_INT; - - for (int i = 0; i < curFieldIdx; i++) { - OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]); - rowHeaderPos += SizeOf.SIZE_OF_INT; - } - for (int i = curFieldIdx; i < dataTypes.length; i++) { - OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET); - rowHeaderPos += SizeOf.SIZE_OF_INT; - } - - // rowOffset is equivalent to a byte length of this row. - forward(curOffset); - } - - public void skipField() { - fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - private void forwardField() { - fieldOffsets[curFieldIdx++] = curOffset; - } - - public void putBool(boolean val) { - ensureSize(SizeOf.SIZE_OF_BOOL); - forwardField(); - - OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00)); - - curOffset += SizeOf.SIZE_OF_BOOL; - } - - public void putInt2(short val) { - ensureSize(SizeOf.SIZE_OF_SHORT); - forwardField(); - - OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_SHORT; - } - - public void putInt4(int val) { - ensureSize(SizeOf.SIZE_OF_INT); - forwardField(); - - OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_INT; - } - - public void putInt8(long val) { - ensureSize(SizeOf.SIZE_OF_LONG); - forwardField(); - - OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_LONG; - } - - public void putFloat4(float val) { - ensureSize(SizeOf.SIZE_OF_FLOAT); - forwardField(); - - OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_FLOAT; - } - - public void putFloat8(double val) { - ensureSize(SizeOf.SIZE_OF_DOUBLE); - forwardField(); - - OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_DOUBLE; - } - - public void putText(String val) { - byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET); - putText(bytes); - } - - public void putText(byte[] val) { - int bytesLen = val.length; - - ensureSize(SizeOf.SIZE_OF_INT + bytesLen); - forwardField(); - - OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); - curOffset += SizeOf.SIZE_OF_INT; - - OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, - recordStartAddr() + curOffset, bytesLen); - curOffset += bytesLen; - } - - public void putBlob(byte[] val) { - int bytesLen = val.length; - - ensureSize(SizeOf.SIZE_OF_INT + bytesLen); - forwardField(); - - OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); - curOffset += SizeOf.SIZE_OF_INT; - - OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, - recordStartAddr() + curOffset, bytesLen); - curOffset += bytesLen; - } - - public void putTimestamp(long val) { - putInt8(val); - } - - public void putDate(int val) { - putInt4(val); - } - - public void putTime(long val) { - putInt8(val); - } - - public void putInterval(IntervalDatum val) { - ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); - forwardField(); - - long offset = recordStartAddr() + curOffset; - OffHeapMemory.UNSAFE.putInt(offset, val.getMonths()); - offset += SizeOf.SIZE_OF_INT; - OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds()); - curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG; - } - - public void putInet4(int val) { - putInt4(val); - } - - public void putProtoDatum(ProtobufDatum val) { - putBlob(val.asByteArray()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java deleted file mode 100644 index 14e67b2..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.util.FileUtil; - -/** - * It specifies the maximum size or increasing ratio. In addition, - * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31 - * due to ByteBuffer. - */ -public class ResizableLimitSpec { - private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); - - public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; - public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); - - private final long initSize; - private final long limitBytes; - private final float incRatio; - private final float allowedOVerflowRatio; - private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f; - private final static float DEFAULT_INCREASE_RATIO = 1.0f; - - public ResizableLimitSpec(long initSize) { - this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO); - } - - public ResizableLimitSpec(long initSize, long limitBytes) { - this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO); - } - - public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) { - this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO); - } - - public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) { - Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes."); - Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB."); - Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes."); - Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB."); - Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0."); - - if (initSize == limitBytes) { - long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio)); - - if (overflowedSize > Integer.MAX_VALUE) { - overflowedSize = Integer.MAX_VALUE; - } - - this.initSize = overflowedSize; - this.limitBytes = overflowedSize; - } else { - this.initSize = initSize; - limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio)); - - if (limitBytes > Integer.MAX_VALUE) { - this.limitBytes = Integer.MAX_VALUE; - } else { - this.limitBytes = limitBytes; - } - } - - this.allowedOVerflowRatio = allowedOverflowRatio; - this.incRatio = incRatio; - } - - public long initialSize() { - return initSize; - } - - public long limit() { - return limitBytes; - } - - public float remainRatio(long currentSize) { - Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); - if (currentSize > Integer.MAX_VALUE) { - currentSize = Integer.MAX_VALUE; - } - return (float)currentSize / (float)limitBytes; - } - - public boolean canIncrease(long currentSize) { - return remain(currentSize) > 0; - } - - public long remain(long currentSize) { - Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); - return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; - } - - public int increasedSize(int currentSize) { - if (currentSize < initSize) { - return (int) initSize; - } - - if (currentSize > Integer.MAX_VALUE) { - LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); - return Integer.MAX_VALUE; - } - long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); - - if (nextSize > limitBytes) { - LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")"); - nextSize = limitBytes; - } - - if (nextSize > Integer.MAX_VALUE) { - LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")"); - nextSize = Integer.MAX_VALUE; - } - - return (int) nextSize; - } - - @Override - public String toString() { - return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit=" - + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio - + ",inc_ratio=" + incRatio; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java deleted file mode 100644 index a2b2561..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java +++ /dev/null @@ -1,73 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; - -/** - * The call sequence should be as follows: - * - * <pre> - * startRow() --> skipField() or putXXX --> endRow() - * </pre> - * - * The total number of skipField and putXXX invocations must be equivalent to the number of fields. - */ -public interface RowWriter { - - public TajoDataTypes.DataType [] dataTypes(); - - public boolean startRow(); - - public void endRow(); - - public void skipField(); - - public void putBool(boolean val); - - public void putInt2(short val); - - public void putInt4(int val); - - public void putInt8(long val); - - public void putFloat4(float val); - - public void putFloat8(double val); - - public void putText(String val); - - public void putText(byte[] val); - - public void putBlob(byte[] val); - - public void putTimestamp(long val); - - public void putTime(long val); - - public void putDate(int val); - - public void putInterval(IntervalDatum val); - - public void putInet4(int val); - - public void putProtoDatum(ProtobufDatum datum); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java deleted file mode 100644 index 3756064..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ /dev/null @@ -1,331 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.*; -import org.apache.tajo.exception.TajoRuntimeException; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.UnsafeUtil; -import org.apache.tajo.util.datetime.TimeMeta; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.charset.Charset; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public abstract class UnSafeTuple implements Tuple { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - private DirectBuffer bb; - private int relativePos; - private int length; - private DataType [] types; - - protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { - this.bb = (DirectBuffer) bb; - this.relativePos = relativePos; - this.length = length; - this.types = types; - } - - void set(ByteBuffer bb, DataType [] types) { - set(bb, 0, bb.limit(), types); - } - - @Override - public int size() { - return types.length; - } - - @Override - public TajoDataTypes.Type type(int fieldId) { - return types[fieldId].getType(); - } - - @Override - public int size(int fieldId) { - return UNSAFE.getInt(getFieldAddr(fieldId)); - } - - public ByteBuffer nioBuffer() { - return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); - } - - public HeapTuple toHeapTuple() { - byte [] bytes = new byte[length]; - UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length); - return new HeapTuple(bytes, types); - } - - public void copyFrom(UnSafeTuple tuple) { - Preconditions.checkNotNull(tuple); - - ((ByteBuffer) bb).clear(); - if (length < tuple.length) { - UnsafeUtil.free((ByteBuffer) bb); - bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder()); - this.relativePos = 0; - this.length = tuple.length; - } - - ((ByteBuffer) bb).put(tuple.nioBuffer()); - } - - private int getFieldOffset(int fieldId) { - return UNSAFE.getInt(bb.address() + (long)(relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT))); - } - - public long getFieldAddr(int fieldId) { - int fieldOffset = getFieldOffset(fieldId); - if (fieldOffset == -1) { - throw new RuntimeException("Invalid Field Access: " + fieldId); - } - return bb.address() + relativePos + fieldOffset; - } - - @Override - public boolean contains(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isBlank(int fieldid) { - return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isBlankOrNull(int fieldid) { - return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public void clear() { - // nothing to do - } - - @Override - public void put(int fieldId, Datum value) { - throw new TajoRuntimeException(new UnsupportedException()); - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new TajoRuntimeException(new UnsupportedException()); - } - - @Override - public void put(Datum[] values) { - throw new TajoRuntimeException(new UnsupportedException()); - } - - @Override - public Datum asDatum(int fieldId) { - if (isBlankOrNull(fieldId)) { - return NullDatum.get(); - } - - switch (types[fieldId].getType()) { - case BOOLEAN: - return DatumFactory.createBool(getBool(fieldId)); - case INT1: - case INT2: - return DatumFactory.createInt2(getInt2(fieldId)); - case INT4: - return DatumFactory.createInt4(getInt4(fieldId)); - case INT8: - return DatumFactory.createInt8(getInt4(fieldId)); - case FLOAT4: - return DatumFactory.createFloat4(getFloat4(fieldId)); - case FLOAT8: - return DatumFactory.createFloat8(getFloat8(fieldId)); - case TEXT: - return DatumFactory.createText(getText(fieldId)); - case TIMESTAMP: - return DatumFactory.createTimestamp(getInt8(fieldId)); - case DATE: - return DatumFactory.createDate(getInt4(fieldId)); - case TIME: - return DatumFactory.createTime(getInt8(fieldId)); - case INTERVAL: - return getInterval(fieldId); - case INET4: - return DatumFactory.createInet4(getInt4(fieldId)); - case PROTOBUF: - return getProtobufDatum(fieldId); - default: - throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'")); - } - } - - @Override - public void clearOffset() { - } - - @Override - public void setOffset(long offset) { - } - - @Override - public long getOffset() { - return 0; - } - - @Override - public boolean getBool(int fieldId) { - return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01; - } - - @Override - public byte getByte(int fieldId) { - return UNSAFE.getByte(getFieldAddr(fieldId)); - } - - @Override - public char getChar(int fieldId) { - return UNSAFE.getChar(getFieldAddr(fieldId)); - } - - @Override - public byte[] getBytes(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return bytes; - } - - @Override - public byte[] getTextBytes(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte[] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return bytes; - } - - @Override - public short getInt2(int fieldId) { - long addr = getFieldAddr(fieldId); - return UNSAFE.getShort(addr); - } - - @Override - public int getInt4(int fieldId) { - return UNSAFE.getInt(getFieldAddr(fieldId)); - } - - @Override - public long getInt8(int fieldId) { - return UNSAFE.getLong(getFieldAddr(fieldId)); - } - - @Override - public float getFloat4(int fieldId) { - return UNSAFE.getFloat(getFieldAddr(fieldId)); - } - - @Override - public double getFloat8(int fieldId) { - return UNSAFE.getDouble(getFieldAddr(fieldId)); - } - - @Override - public String getText(int fieldId) { - return new String(getTextBytes(fieldId)); - } - - public IntervalDatum getInterval(int fieldId) { - long pos = getFieldAddr(fieldId); - int months = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - long millisecs = UNSAFE.getLong(pos); - return new IntervalDatum(months, millisecs); - } - - @Override - public Datum getProtobufDatum(int fieldId) { - byte [] bytes = getBytes(fieldId); - - ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId]); - Message.Builder builder = factory.newBuilder(); - try { - builder.mergeFrom(bytes); - } catch (InvalidProtocolBufferException e) { - return NullDatum.get(); - } - - return new ProtobufDatum(builder.build()); - } - - @Override - public char[] getUnicodeChars(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); - } - - @Override - public TimeMeta getTimeDate(int fieldId) { - return null; - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - return toHeapTuple(); - } - - @Override - public Datum[] getValues() { - Datum [] datums = new Datum[size()]; - for (int i = 0; i < size(); i++) { - if (contains(i)) { - datums[i] = asDatum(i); - } else { - datums[i] = NullDatum.get(); - } - } - return datums; - } - - @Override - public String toString() { - return VTuple.toDisplayString(getValues()); - } - - public abstract void release(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java deleted file mode 100644 index 73e1e2f..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java +++ /dev/null @@ -1,99 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.primitives.Longs; -import com.google.common.primitives.UnsignedLongs; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; - -import java.nio.ByteOrder; - -/** - * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator. - */ -public class UnSafeTupleBytesComparator { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - static final boolean littleEndian = - ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); - - public static int compare(long ptr1, long ptr2) { - int lstrLen = UNSAFE.getInt(ptr1); - int rstrLen = UNSAFE.getInt(ptr2); - - ptr1 += SizeOf.SIZE_OF_INT; - ptr2 += SizeOf.SIZE_OF_INT; - - int minLength = Math.min(lstrLen, rstrLen); - int minWords = minLength / Longs.BYTES; - - /* - * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a - * time is no slower than comparing 4 bytes at a time even on 32-bit. - * On the other hand, it is substantially faster on 64-bit. - */ - for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { - long lw = UNSAFE.getLong(ptr1); - long rw = UNSAFE.getLong(ptr2); - long diff = lw ^ rw; - - if (diff != 0) { - if (!littleEndian) { - return UnsignedLongs.compare(lw, rw); - } - - // Use binary search - int n = 0; - int y; - int x = (int) diff; - if (x == 0) { - x = (int) (diff >>> 32); - n = 32; - } - - y = x << 16; - if (y == 0) { - n += 16; - } else { - x = y; - } - - y = x << 8; - if (y == 0) { - n += 8; - } - return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); - } - - ptr1 += SizeOf.SIZE_OF_LONG; - ptr2 += SizeOf.SIZE_OF_LONG; - } - - // The epilogue to cover the last (minLength % 8) elements. - for (int i = minWords * Longs.BYTES; i < minLength; i++) { - int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++); - if (result != 0) { - return result; - } - } - return lstrLen - rstrLen; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java deleted file mode 100644 index 51dbb29..0000000 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import java.nio.ByteBuffer; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class ZeroCopyTuple extends UnSafeTuple { - - public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { - super.set(bb, relativePos, length, types); - } - - @Override - public void release() { - // nothing to do - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index dfdff85..676c072 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -39,7 +39,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value> + <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value> </property> <!--- Fragment Class Configurations --> @@ -56,6 +56,10 @@ <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> <property> + <name>tajo.storage.fragment.draw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> <name>tajo.storage.fragment.rcfile.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -101,6 +105,11 @@ </property> <property> + <name>tajo.storage.scanner-handler.draw.class</name> + <value>org.apache.tajo.storage.rawfile.DirectRawFileScanner</value> + </property> + + <property> <name>tajo.storage.scanner-handler.rcfile.class</name> <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> </property> @@ -134,7 +143,7 @@ <name>tajo.storage.scanner-handler.hbase.class</name> <value>org.apache.tajo.storage.hbase.HBaseScanner</value> </property> - + <!--- Appender Handler --> <property> <name>tajo.storage.appender-handler</name> @@ -157,6 +166,11 @@ </property> <property> + <name>tajo.storage.appender-handler.draw.class</name> + <value>org.apache.tajo.storage.rawfile.DirectRawFileWriter</value> + </property> + + <property> <name>tajo.storage.appender-handler.rcfile.class</name> <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> </property> @@ -212,4 +226,4 @@ <value>131072</value> <description>128KB write buffer</description> </property> -</configuration> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java deleted file mode 100644 index b332364..0000000 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.tuple.offheap.*; -import org.junit.Test; - -public class TestBaseTupleBuilder { - - @Test - public void testBuild() { - BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); - - OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248); - OffHeapRowBlockReader reader = rowBlock.getReader(); - - ZeroCopyTuple inputTuple = new ZeroCopyTuple(); - - HeapTuple heapTuple = null; - ZeroCopyTuple zcTuple = null; - int i = 0; - while(reader.next(inputTuple)) { - RowStoreUtil.convert(inputTuple, builder); - - heapTuple = builder.buildToHeapTuple(); - TestOffHeapRowBlock.validateTupleResult(i, heapTuple); - - zcTuple = builder.buildToZeroCopyTuple(); - TestOffHeapRowBlock.validateTupleResult(i, zcTuple); - - i++; - } - } - - @Test - public void testBuildWithNull() { - BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); - - OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248); - OffHeapRowBlockReader reader = rowBlock.getReader(); - - ZeroCopyTuple inputTuple = new ZeroCopyTuple(); - - HeapTuple heapTuple = null; - ZeroCopyTuple zcTuple = null; - int i = 0; - while(reader.next(inputTuple)) { - RowStoreUtil.convert(inputTuple, builder); - - heapTuple = builder.buildToHeapTuple(); - TestOffHeapRowBlock.validateNullity(i, heapTuple); - - zcTuple = builder.buildToZeroCopyTuple(); - TestOffHeapRowBlock.validateNullity(i, zcTuple); - - i++; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java deleted file mode 100644 index 96f465a..0000000 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java +++ /dev/null @@ -1,45 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.catalog.SchemaUtil; -import org.junit.Test; - -public class TestHeapTuple { - - @Test - public void testHeapTuple() { - OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - int i = 0; - while (reader.next(zcTuple)) { - byte [] bytes = new byte[zcTuple.nioBuffer().limit()]; - zcTuple.nioBuffer().get(bytes); - - HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema)); - TestOffHeapRowBlock.validateTupleResult(i, heapTuple); - i++; - } - - rowBlock.release(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java deleted file mode 100644 index 278d733..0000000 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java +++ /dev/null @@ -1,577 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.*; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.ProtoUtil; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import static org.apache.tajo.common.TajoDataTypes.Type; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestOffHeapRowBlock { - private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class); - public static String UNICODE_FIELD_PREFIX = "abc_ê°ëë¤_"; - public static Schema schema; - - static { - schema = new Schema(); - schema.addColumn("col0", Type.BOOLEAN); - schema.addColumn("col1", Type.INT2); - schema.addColumn("col2", Type.INT4); - schema.addColumn("col3", Type.INT8); - schema.addColumn("col4", Type.FLOAT4); - schema.addColumn("col5", Type.FLOAT8); - schema.addColumn("col6", Type.TEXT); - schema.addColumn("col7", Type.TIMESTAMP); - schema.addColumn("col8", Type.DATE); - schema.addColumn("col9", Type.TIME); - schema.addColumn("col10", Type.INTERVAL); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", - CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); - } - - private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) { - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (endTime - startTime) + " msec"); - } - - @Test - public void testPutAndReadValidation() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - ZeroCopyTuple tuple = new ZeroCopyTuple(); - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRow(i, rowBlock.getWriter()); - - reader.reset(); - int j = 0; - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - - j++; - } - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); - - long readStart = System.currentTimeMillis(); - tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - @Test - public void testNullityValidation() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRowBlockWithNull(i, rowBlock.getWriter()); - - reader.reset(); - int j = 0; - while(reader.next(tuple)) { - validateNullity(j, tuple); - - j++; - } - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec"); - - long readStart = System.currentTimeMillis(); - tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - validateNullity(j, tuple); - - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - @Test - public void testEmptyRow() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - rowBlock.getWriter().startRow(); - // empty columns - rowBlock.getWriter().endRow(); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing tooks " + (writeEnd - writeStart) + " msec"); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - rowBlock.release(); - - assertEquals(rowNum, j); - assertEquals(rowNum, rowBlock.rows()); - } - - @Test - public void testSortBenchmark() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList(); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - reader.reset(); - while(reader.next(tuple)) { - unSafeTuples.add(tuple); - tuple = new ZeroCopyTuple(); - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4)); - BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec}); - - long sortStart = System.currentTimeMillis(); - Collections.sort(unSafeTuples, comparator); - long sortEnd = System.currentTimeMillis(); - LOG.info("sorting took " + (sortEnd - sortStart) + " msec"); - rowBlock.release(); - } - - @Test - public void testVTuplePutAndGetBenchmark() { - int rowNum = 1000; - - List<VTuple> rowBlock = Lists.newArrayList(); - long writeStart = System.currentTimeMillis(); - VTuple tuple; - for (int i = 0; i < rowNum; i++) { - tuple = new VTuple(schema.size()); - fillVTuple(i, tuple); - rowBlock.add(tuple); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); - - long readStart = System.currentTimeMillis(); - int j = 0; - for (VTuple t : rowBlock) { - validateTupleResult(j, t); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - int count = 0; - for (int l = 0; l < rowBlock.size(); l++) { - for(int m = 0; m < schema.size(); m++ ) { - if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) { - count ++; - } - } - } - // For preventing unnecessary code elimination optimization. - LOG.info("The number of INT4 values is " + count + "."); - } - - @Test - public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100); - - long writeStart = System.currentTimeMillis(); - VTuple tuple = new VTuple(schema.size()); - for (int i = 0; i < rowNum; i++) { - fillVTuple(i, tuple); - - RowStoreUtil.convert(tuple, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); - - validateResults(rowBlock); - rowBlock.release(); - } - - @Test - public void testSerDerOfRowBlock() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - - ByteBuffer bb = rowBlock.nioBuffer(); - OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); - validateResults(restoredRowBlock); - rowBlock.release(); - } - - @Test - public void testSerDerOfZeroCopyTuple() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - - ByteBuffer bb = rowBlock.nioBuffer(); - OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - ZeroCopyTuple copyTuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - ByteBuffer copy = tuple.nioBuffer(); - copyTuple.set(copy, SchemaUtil.toDataTypes(schema)); - - validateTupleResult(j, copyTuple); - - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - public static OffHeapRowBlock createRowBlock(int rowNum) { - long allocateStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); - long allocatedEnd = System.currentTimeMillis(); - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (allocatedEnd - allocateStart) + " msec"); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRow(i, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); - - return rowBlock; - } - - public static OffHeapRowBlock createRowBlockWithNull(int rowNum) { - long allocateStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); - long allocatedEnd = System.currentTimeMillis(); - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (allocatedEnd - allocateStart) + " msec"); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRowBlockWithNull(i, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); - - return rowBlock; - } - - public static void fillRow(int i, RowWriter builder) { - builder.startRow(); - builder.putBool(i % 1 == 0 ? true : false); // 0 - builder.putInt2((short) 1); // 1 - builder.putInt4(i); // 2 - builder.putInt8(i); // 3 - builder.putFloat4(i); // 4 - builder.putFloat8(i); // 5 - builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 - builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 - builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 - builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 - builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 - builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 - builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 - builder.endRow(); - } - - public static void fillRowBlockWithNull(int i, RowWriter writer) { - writer.startRow(); - - if (i == 0) { - writer.skipField(); - } else { - writer.putBool(i % 1 == 0 ? true : false); // 0 - } - if (i % 1 == 0) { - writer.skipField(); - } else { - writer.putInt2((short) 1); // 1 - } - - if (i % 2 == 0) { - writer.skipField(); - } else { - writer.putInt4(i); // 2 - } - - if (i % 3 == 0) { - writer.skipField(); - } else { - writer.putInt8(i); // 3 - } - - if (i % 4 == 0) { - writer.skipField(); - } else { - writer.putFloat4(i); // 4 - } - - if (i % 5 == 0) { - writer.skipField(); - } else { - writer.putFloat8(i); // 5 - } - - if (i % 6 == 0) { - writer.skipField(); - } else { - writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 - } - - if (i % 7 == 0) { - writer.skipField(); - } else { - writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 - } - - if (i % 8 == 0) { - writer.skipField(); - } else { - writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 - } - - if (i % 9 == 0) { - writer.skipField(); - } else { - writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 - } - - if (i % 10 == 0) { - writer.skipField(); - } else { - writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 - } - - if (i % 11 == 0) { - writer.skipField(); - } else { - writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 - } - - if (i % 12 == 0) { - writer.skipField(); - } else { - writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 - } - - writer.endRow(); - } - - public static void fillVTuple(int i, VTuple tuple) { - tuple.put(0, DatumFactory.createBool(i % 1 == 0)); - tuple.put(1, DatumFactory.createInt2((short) 1)); - tuple.put(2, DatumFactory.createInt4(i)); - tuple.put(3, DatumFactory.createInt8(i)); - tuple.put(4, DatumFactory.createFloat4(i)); - tuple.put(5, DatumFactory.createFloat8(i)); - tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes())); - tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 - tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 - tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 - tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 - tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 - tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; - } - - public static void validateResults(OffHeapRowBlock rowBlock) { - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - int j = 0; - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - reader.reset(); - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("Reading takes " + (readEnd - readStart) + " msec"); - } - - public static void validateTupleResult(int j, Tuple t) { - assertTrue((j % 1 == 0) == t.getBool(0)); - assertTrue(1 == t.getInt2(1)); - assertEquals(j, t.getInt4(2)); - assertEquals(j, t.getInt8(3)); - assertTrue(j == t.getFloat4(4)); - assertTrue(j == t.getFloat8(5)); - assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); - } - - public static void validateNullity(int j, Tuple tuple) { - if (j == 0) { - tuple.isBlankOrNull(0); - } else { - assertTrue((j % 1 == 0) == tuple.getBool(0)); - } - - if (j % 1 == 0) { - tuple.isBlankOrNull(1); - } else { - assertTrue(1 == tuple.getInt2(1)); - } - - if (j % 2 == 0) { - tuple.isBlankOrNull(2); - } else { - assertEquals(j, tuple.getInt4(2)); - } - - if (j % 3 == 0) { - tuple.isBlankOrNull(3); - } else { - assertEquals(j, tuple.getInt8(3)); - } - - if (j % 4 == 0) { - tuple.isBlankOrNull(4); - } else { - assertTrue(j == tuple.getFloat4(4)); - } - - if (j % 5 == 0) { - tuple.isBlankOrNull(5); - } else { - assertTrue(j == tuple.getFloat8(5)); - } - - if (j % 6 == 0) { - tuple.isBlankOrNull(6); - } else { - assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); - } - - if (j % 7 == 0) { - tuple.isBlankOrNull(7); - } else { - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); - } - - if (j % 8 == 0) { - tuple.isBlankOrNull(8); - } else { - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); - } - - if (j % 9 == 0) { - tuple.isBlankOrNull(9); - } else { - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); - } - - if (j % 10 == 0) { - tuple.isBlankOrNull(10); - } else { - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); - } - - if (j % 11 == 0) { - tuple.isBlankOrNull(11); - } else { - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); - } - - if (j % 12 == 0) { - tuple.isBlankOrNull(12); - } else { - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java deleted file mode 100644 index 1eb9c17..0000000 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.unit.StorageUnit; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestResizableSpec { - - @Test - public void testResizableLimit() { - ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f); - - long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); - - assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); - - assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB)); - - assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB)); - - assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1); - - assertFalse(limit.canIncrease(limit.limit())); - } - - @Test - public void testFixedLimit() { - FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f); - - assertEquals(limit.limit(), 100 * StorageUnit.MB); - - assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000)); - - assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB)); - - assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB)); - - assertFalse(limit.canIncrease(limit.limit())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index f637da0..8a9b9ea 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -38,7 +38,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value> + <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value> </property> <!--- Fragment Class Configurations --> @@ -55,6 +55,10 @@ <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> <property> + <name>tajo.storage.fragment.draw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> <name>tajo.storage.fragment.rcfile.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -100,6 +104,11 @@ </property> <property> + <name>tajo.storage.scanner-handler.draw.class</name> + <value>org.apache.tajo.storage.rawfile.DirectRawFileScanner</value> + </property> + + <property> <name>tajo.storage.scanner-handler.rcfile.class</name> <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> </property> @@ -116,7 +125,7 @@ <property> <name>tajo.storage.scanner-handler.orc.class</name> - <value>org.apache.tajo.storage.orc.OrcScanner</value> + <value>org.apache.tajo.storage.orc.ORCScanner</value> </property> <property> @@ -137,7 +146,7 @@ <!--- Appender Handler --> <property> <name>tajo.storage.appender-handler</name> - <value>text,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + <value>text,raw,draw,rcfile,row,parquet,sequencefile,avro,hbase</value> </property> <property> @@ -156,6 +165,11 @@ </property> <property> + <name>tajo.storage.appender-handler.draw.class</name> + <value>org.apache.tajo.storage.rawfile.DirectRawFileWriter</value> + </property> + + <property> <name>tajo.storage.appender-handler.rcfile.class</name> <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> </property> @@ -211,4 +225,4 @@ <value>131072</value> <description>128KB write buffer</description> </property> -</configuration> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml index e2d5132..d499cd4 100644 --- a/tajo-storage/tajo-storage-hbase/pom.xml +++ b/tajo-storage/tajo-storage-hbase/pom.xml @@ -233,6 +233,10 @@ <groupId>com.sun.jersey.jersey-test-framework</groupId> <artifactId>jersey-test-framework-grizzly2</artifactId> </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> <dependency> @@ -306,6 +310,12 @@ <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java index 8ae9a26..0172484 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java @@ -25,15 +25,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.tuple.offheap.OffHeapRowBlock; -import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader; -import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.memory.MemoryRowBlock; +import org.apache.tajo.tuple.memory.RowBlock; +import org.apache.tajo.tuple.memory.UnSafeTuple; +import org.apache.tajo.tuple.memory.ZeroCopyTuple; import org.apache.tajo.unit.StorageUnit; import java.io.File; @@ -44,30 +46,27 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class); private SeekableInputChannel channel; - private TajoDataTypes.DataType[] columnTypes; - private boolean eof = false; + private boolean eos = false; private long fileSize; private long recordCount; + private long filePosition; + private long endOffset; - private ZeroCopyTuple unSafeTuple = new ZeroCopyTuple(); - private OffHeapRowBlock tupleBuffer; - private OffHeapRowBlockReader reader; + private ZeroCopyTuple unSafeTuple = new UnSafeTuple(); + private RowBlock tupleBuffer; + private RowBlockReader reader; - public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { + public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { super(conf, schema, meta, fragment); } public void init() throws IOException { initChannel(); - columnTypes = new TajoDataTypes.DataType[schema.size()]; - for (int i = 0; i < schema.size(); i++) { - columnTypes[i] = schema.getColumn(i).getDataType(); - } + tupleBuffer = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), 64 * StorageUnit.KB, true); - tupleBuffer = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - reader = new OffHeapRowBlockReader(tupleBuffer); + reader = tupleBuffer.getReader(); fetchNeeded = !next(tupleBuffer); @@ -90,44 +89,55 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner } channel = new LocalFileInputChannel(new FileInputStream(file)); - channel.seek(fragment.getStartKey()); fileSize = channel.size(); } else { channel = new FSDataInputChannel(fs.open(fragment.getPath())); - channel.seek(fragment.getStartKey()); fileSize = channel.size(); } + // initial set position + if (fragment.getStartKey() > 0) { + channel.seek(fragment.getStartKey()); + } + if (tableStats != null) { tableStats.setNumBytes(fileSize); } + filePosition = fragment.getStartKey(); + endOffset = fragment.getStartKey() + fragment.getLength(); if (LOG.isDebugEnabled()) { LOG.debug("RawFileScanner open:" + fragment.getPath() + ", offset :" + - fragment.getStartKey() + ", file size :" + fileSize); + fragment.getStartKey() + ", fragment length :" + fragment.getLength()); } } @Override public long getNextOffset() throws IOException { - return channel.position() - reader.remainForRead(); + return filePosition - reader.remainForRead(); } @Override public void seek(long offset) throws IOException { channel.seek(offset); + filePosition = channel.position(); + tupleBuffer.getMemory().clear(); fetchNeeded = true; } - public boolean next(OffHeapRowBlock rowblock) throws IOException { - return rowblock.copyFromChannel(channel, tableStats); + public boolean next(RowBlock rowblock) throws IOException { + long reamin = reader.remainForRead(); + boolean ret = rowblock.copyFromChannel(channel); + reader = rowblock.getReader(); + filePosition += rowblock.getMemory().writerPosition() - reamin; + return ret; } private boolean fetchNeeded = true; @Override public Tuple next() throws IOException { - if(eof) { + if(eos) { return null; } @@ -136,13 +146,15 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner if (!next(tupleBuffer)) { return null; } - reader.reset(); } fetchNeeded = !reader.next(unSafeTuple); if (!fetchNeeded) { recordCount++; + if(filePosition - reader.remainForRead() >= endOffset){ + eos = true; + } return unSafeTuple; } } @@ -151,8 +163,9 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner @Override public void reset() throws IOException { // reload initial buffer - seek(0); - eof = false; + filePosition = fragment.getStartKey(); + seek(filePosition); + eos = false; reader.reset(); } @@ -162,8 +175,10 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner tableStats.setReadBytes(fileSize); tableStats.setNumRows(recordCount); } - tupleBuffer.release(); - tupleBuffer = null; + if(tupleBuffer != null) { + tupleBuffer.release(); + tupleBuffer = null; + } reader = null; IOUtils.cleanup(LOG, channel); @@ -201,7 +216,7 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner tableStats.setReadBytes(filePos); } - if(eof || channel == null) { + if(eos || channel == null) { tableStats.setReadBytes(fileSize); return 1.0f; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java index 912649f..03642a7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java @@ -28,38 +28,37 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.*; -import org.apache.tajo.tuple.BaseTupleBuilder; -import org.apache.tajo.tuple.offheap.OffHeapRowBlock; -import org.apache.tajo.tuple.offheap.UnSafeTuple; +import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.MemoryRowBlock; import org.apache.tajo.unit.StorageUnit; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class DirectRawFileWriter extends FileAppender { public static final String FILE_EXTENSION = "draw"; + private static final int BUFFER_SIZE = 64 * StorageUnit.KB; private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class); private FileChannel channel; private RandomAccessFile randomAccessFile; private FSDataOutputStream fos; - private TajoDataTypes.DataType[] columnTypes; private boolean isLocal; private long pos; private TableStatistics stats; private ShuffleType shuffleType; - - private BaseTupleBuilder builder; + private MemoryRowBlock memoryRowBlock; public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId, final Schema schema, final TableMeta meta, final Path path) throws IOException { @@ -90,12 +89,6 @@ public class DirectRawFileWriter extends FileAppender { isLocal = false; } - pos = 0; - columnTypes = new TajoDataTypes.DataType[schema.size()]; - for (int i = 0; i < schema.size(); i++) { - columnTypes[i] = schema.getColumn(i).getDataType(); - } - if (enabledStats) { this.stats = new TableStatistics(this.schema); this.shuffleType = PlannerUtil.getShuffleType( @@ -103,52 +96,27 @@ public class DirectRawFileWriter extends FileAppender { PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE))); } - builder = new BaseTupleBuilder(schema); - + memoryRowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), BUFFER_SIZE, true); + pos = 0; super.init(); } @Override public long getOffset() throws IOException { - return pos; + return pos + memoryRowBlock.getMemory().writerPosition(); } - private long getFilePosition() throws IOException { - if (isLocal) { - return channel.position(); + public void writeRowBlock(MemoryRowBlock rowBlock) throws IOException { + if(isLocal) { + pos += rowBlock.getMemory().writeTo(channel); } else { - return fos.getPos(); + pos += rowBlock.getMemory().writeTo(fos); } - } - public void writeRowBlock(OffHeapRowBlock rowBlock) throws IOException { - write(rowBlock.nioBuffer()); - if (enabledStats) { - stats.incrementRows(rowBlock.rows()); - } - - pos = getFilePosition(); - } + rowBlock.getMemory().clear(); - private ByteBuffer buffer; - private void ensureSize(int size) throws IOException { - if (buffer.remaining() < size) { - - buffer.limit(buffer.position()); - buffer.flip(); - write(buffer); - - buffer.clear(); - } - } - - private void write(ByteBuffer buffer) throws IOException { - if(isLocal) { - channel.write(buffer); - } else { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - fos.write(bytes); + if (enabledStats) { + stats.incrementRows(rowBlock.rows() - stats.getNumRows()); } } @@ -161,43 +129,24 @@ public class DirectRawFileWriter extends FileAppender { } } - if (buffer == null) { - buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB); - } - - UnSafeTuple unSafeTuple; + memoryRowBlock.getWriter().addTuple(t); - if (!(t instanceof UnSafeTuple)) { - RowStoreUtil.convert(t, builder); - unSafeTuple = builder.buildToZeroCopyTuple(); - } else { - unSafeTuple = (UnSafeTuple) t; - } - - ByteBuffer bb = unSafeTuple.nioBuffer(); - ensureSize(bb.limit()); - buffer.put(bb); - - pos = getFilePosition() + (buffer.limit() - buffer.remaining()); - - if (enabledStats) { - stats.incrementRow(); + if(memoryRowBlock.getMemory().readableBytes() >= BUFFER_SIZE) { + writeRowBlock(memoryRowBlock); } } @Override public void flush() throws IOException { - if (buffer != null) { - buffer.limit(buffer.position()); - buffer.flip(); - write(buffer); - buffer.clear(); + if(memoryRowBlock.getMemory().isReadable()) { + writeRowBlock(memoryRowBlock); } } @Override public void close() throws IOException { flush(); + if (enabledStats) { stats.setNumBytes(getOffset()); } @@ -206,6 +155,7 @@ public class DirectRawFileWriter extends FileAppender { } IOUtils.cleanup(LOG, channel, randomAccessFile, fos); + memoryRowBlock.release(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index 3c667bf..2456907 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -29,7 +29,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; public class ByteBufLineReader implements Closeable { - public static int DEFAULT_BUFFER = 64 * 1024; + public static final int DEFAULT_BUFFER = 64 * 1024; private int bufferSize; private long readBytes; @@ -100,7 +100,7 @@ public class ByteBufLineReader implements Closeable { tailBytes = this.buffer.writerIndex(); if (!this.buffer.isWritable()) { // a line bytes is large than the buffer - BufferPool.ensureWritable(buffer, bufferSize * 2); + this.buffer = BufferPool.ensureWritable(buffer, bufferSize * 2); this.bufferSize = buffer.capacity(); } this.startIndex = 0;
