http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java new file mode 100644 index 0000000..14e67b2 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.util.FileUtil; + +/** + * It specifies the maximum size or increasing ratio. In addition, + * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31 + * due to ByteBuffer. + */ +public class ResizableLimitSpec { + private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); + + public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; + public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); + + private final long initSize; + private final long limitBytes; + private final float incRatio; + private final float allowedOVerflowRatio; + private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f; + private final static float DEFAULT_INCREASE_RATIO = 1.0f; + + public ResizableLimitSpec(long initSize) { + this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes) { + this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) { + this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO); + } + + public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) { + Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes."); + Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB."); + Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes."); + Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB."); + Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0."); + + if (initSize == limitBytes) { + long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio)); + + if (overflowedSize > Integer.MAX_VALUE) { + overflowedSize = Integer.MAX_VALUE; + } + + this.initSize = overflowedSize; + this.limitBytes = overflowedSize; + } else { + this.initSize = initSize; + limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio)); + + if (limitBytes > Integer.MAX_VALUE) { + this.limitBytes = Integer.MAX_VALUE; + } else { + this.limitBytes = limitBytes; + } + } + + this.allowedOVerflowRatio = allowedOverflowRatio; + this.incRatio = incRatio; + } + + public long initialSize() { + return initSize; + } + + public long limit() { + return limitBytes; + } + + public float remainRatio(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + if (currentSize > Integer.MAX_VALUE) { + currentSize = Integer.MAX_VALUE; + } + return (float)currentSize / (float)limitBytes; + } + + public boolean canIncrease(long currentSize) { + return remain(currentSize) > 0; + } + + public long remain(long currentSize) { + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; + } + + public int increasedSize(int currentSize) { + if (currentSize < initSize) { + return (int) initSize; + } + + if (currentSize > Integer.MAX_VALUE) { + LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); + return Integer.MAX_VALUE; + } + long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); + + if (nextSize > limitBytes) { + LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")"); + nextSize = limitBytes; + } + + if (nextSize > Integer.MAX_VALUE) { + LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")"); + nextSize = Integer.MAX_VALUE; + } + + return (int) nextSize; + } + + @Override + public String toString() { + return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit=" + + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio + + ",inc_ratio=" + incRatio; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java new file mode 100644 index 0000000..a2b2561 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java @@ -0,0 +1,73 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; + +/** + * The call sequence should be as follows: + * + * <pre> + * startRow() --> skipField() or putXXX --> endRow() + * </pre> + * + * The total number of skipField and putXXX invocations must be equivalent to the number of fields. + */ +public interface RowWriter { + + public TajoDataTypes.DataType [] dataTypes(); + + public boolean startRow(); + + public void endRow(); + + public void skipField(); + + public void putBool(boolean val); + + public void putInt2(short val); + + public void putInt4(int val); + + public void putInt8(long val); + + public void putFloat4(float val); + + public void putFloat8(double val); + + public void putText(String val); + + public void putText(byte[] val); + + public void putBlob(byte[] val); + + public void putTimestamp(long val); + + public void putTime(long val); + + public void putDate(int val); + + public void putInterval(IntervalDatum val); + + public void putInet4(int val); + + public void putProtoDatum(ProtobufDatum datum); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java new file mode 100644 index 0000000..b742e6d --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -0,0 +1,311 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.UnsafeUtil; + +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.Charset; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public abstract class UnSafeTuple implements Tuple { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + private DirectBuffer bb; + private int relativePos; + private int length; + private DataType [] types; + + protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + this.bb = (DirectBuffer) bb; + this.relativePos = relativePos; + this.length = length; + this.types = types; + } + + void set(ByteBuffer bb, DataType [] types) { + set(bb, 0, bb.limit(), types); + } + + @Override + public int size() { + return types.length; + } + + public ByteBuffer nioBuffer() { + return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); + } + + public HeapTuple toHeapTuple() { + byte [] bytes = new byte[length]; + UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length); + return new HeapTuple(bytes, types); + } + + public void copyFrom(UnSafeTuple tuple) { + Preconditions.checkNotNull(tuple); + + ((ByteBuffer) bb).clear(); + if (length < tuple.length) { + UnsafeUtil.free((ByteBuffer) bb); + bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder()); + this.relativePos = 0; + this.length = tuple.length; + } + + ((ByteBuffer) bb).put(tuple.nioBuffer()); + } + + private int getFieldOffset(int fieldId) { + return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + } + + public long getFieldAddr(int fieldId) { + int fieldOffset = getFieldOffset(fieldId); + if (fieldOffset == -1) { + throw new RuntimeException("Invalid Field Access: " + fieldId); + } + return bb.address() + relativePos + fieldOffset; + } + + @Override + public boolean contains(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public boolean isNotNull(int fieldid) { + return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void clear() { + // nothing to do + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); + } + + @Override + public void put(Datum[] values) { + throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + } + + @Override + public Datum get(int fieldId) { + if (isNull(fieldId)) { + return NullDatum.get(); + } + + switch (types[fieldId].getType()) { + case BOOLEAN: + return DatumFactory.createBool(getBool(fieldId)); + case INT1: + case INT2: + return DatumFactory.createInt2(getInt2(fieldId)); + case INT4: + return DatumFactory.createInt4(getInt4(fieldId)); + case INT8: + return DatumFactory.createInt8(getInt4(fieldId)); + case FLOAT4: + return DatumFactory.createFloat4(getFloat4(fieldId)); + case FLOAT8: + return DatumFactory.createFloat8(getFloat8(fieldId)); + case TEXT: + return DatumFactory.createText(getText(fieldId)); + case TIMESTAMP: + return DatumFactory.createTimestamp(getInt8(fieldId)); + case DATE: + return DatumFactory.createDate(getInt4(fieldId)); + case TIME: + return DatumFactory.createTime(getInt8(fieldId)); + case INTERVAL: + return getInterval(fieldId); + case INET4: + return DatumFactory.createInet4(getInt4(fieldId)); + case PROTOBUF: + return getProtobufDatum(fieldId); + default: + throw new UnsupportedException("Unknown type: " + types[fieldId]); + } + } + + @Override + public void setOffset(long offset) { + } + + @Override + public long getOffset() { + return 0; + } + + @Override + public boolean getBool(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01; + } + + @Override + public byte getByte(int fieldId) { + return UNSAFE.getByte(getFieldAddr(fieldId)); + } + + @Override + public char getChar(int fieldId) { + return UNSAFE.getChar(getFieldAddr(fieldId)); + } + + @Override + public byte[] getBytes(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override + public short getInt2(int fieldId) { + long addr = getFieldAddr(fieldId); + return UNSAFE.getShort(addr); + } + + @Override + public int getInt4(int fieldId) { + return UNSAFE.getInt(getFieldAddr(fieldId)); + } + + @Override + public long getInt8(int fieldId) { + return UNSAFE.getLong(getFieldAddr(fieldId)); + } + + @Override + public float getFloat4(int fieldId) { + return UNSAFE.getFloat(getFieldAddr(fieldId)); + } + + @Override + public double getFloat8(int fieldId) { + return UNSAFE.getDouble(getFieldAddr(fieldId)); + } + + @Override + public String getText(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return new String(bytes); + } + + public IntervalDatum getInterval(int fieldId) { + long pos = getFieldAddr(fieldId); + int months = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + long millisecs = UNSAFE.getLong(pos); + return new IntervalDatum(months, millisecs); + } + + @Override + public Datum getProtobufDatum(int fieldId) { + byte [] bytes = getBytes(fieldId); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + } catch (InvalidProtocolBufferException e) { + return NullDatum.get(); + } + + return new ProtobufDatum(builder.build()); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte [] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + return toHeapTuple(); + } + + @Override + public Datum[] getValues() { + Datum [] datums = new Datum[size()]; + for (int i = 0; i < size(); i++) { + if (contains(i)) { + datums[i] = get(i); + } else { + datums[i] = NullDatum.get(); + } + } + return datums; + } + + @Override + public String toString() { + return VTuple.toDisplayString(getValues()); + } + + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java new file mode 100644 index 0000000..73e1e2f --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java @@ -0,0 +1,99 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedLongs; +import org.apache.tajo.util.SizeOf; +import org.apache.tajo.util.UnsafeUtil; +import sun.misc.Unsafe; + +import java.nio.ByteOrder; + +/** + * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator. + */ +public class UnSafeTupleBytesComparator { + private static final Unsafe UNSAFE = UnsafeUtil.unsafe; + + static final boolean littleEndian = + ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); + + public static int compare(long ptr1, long ptr2) { + int lstrLen = UNSAFE.getInt(ptr1); + int rstrLen = UNSAFE.getInt(ptr2); + + ptr1 += SizeOf.SIZE_OF_INT; + ptr2 += SizeOf.SIZE_OF_INT; + + int minLength = Math.min(lstrLen, rstrLen); + int minWords = minLength / Longs.BYTES; + + /* + * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a + * time is no slower than comparing 4 bytes at a time even on 32-bit. + * On the other hand, it is substantially faster on 64-bit. + */ + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = UNSAFE.getLong(ptr1); + long rw = UNSAFE.getLong(ptr2); + long diff = lw ^ rw; + + if (diff != 0) { + if (!littleEndian) { + return UnsignedLongs.compare(lw, rw); + } + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + if (x == 0) { + x = (int) (diff >>> 32); + n = 32; + } + + y = x << 16; + if (y == 0) { + n += 16; + } else { + x = y; + } + + y = x << 8; + if (y == 0) { + n += 8; + } + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + + ptr1 += SizeOf.SIZE_OF_LONG; + ptr2 += SizeOf.SIZE_OF_LONG; + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++); + if (result != 0) { + return result; + } + } + return lstrLen - rstrLen; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java new file mode 100644 index 0000000..51dbb29 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import java.nio.ByteBuffer; + +import static org.apache.tajo.common.TajoDataTypes.DataType; + +public class ZeroCopyTuple extends UnSafeTuple { + + public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { + super.set(bb, relativePos, length, types); + } + + @Override + public void release() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto new file mode 100644 index 0000000..f5c8a08 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.index"; +option java_outer_classname = "IndexProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message TupleComparatorProto { + required SchemaProto schema = 1; + repeated SortSpecProto sortSpecs = 2; + repeated TupleComparatorSpecProto compSpecs = 3; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml new file mode 100644 index 0000000..47d11c7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -0,0 +1,184 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <!-- Storage Manager Configuration --> + <property> + <name>tajo.storage.manager.hdfs.class</name> + <value>org.apache.tajo.storage.FileStorageManager</value> + </property> + <property> + <name>tajo.storage.manager.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + </property> + + <property> + <name>tajo.storage.manager.concurrency.perDisk</name> + <value>1</value> + <description></description> + </property> + + <!--- Registered Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + </property> + + <!--- Fragment Class Configurations --> + <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.csv.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.raw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.rcfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.row.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.parquet.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.sequencefile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.avro.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseFragment</value> + </property> + + <!--- Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseScanner</value> + </property> + + <!--- Appender Handler --> + <property> + <name>tajo.storage.appender-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + </property> + + <property> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.hfile.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java new file mode 100644 index 0000000..0251dc7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestFrameTuple { + private Tuple tuple1; + private Tuple tuple2; + + @Before + public void setUp() throws Exception { + tuple1 = new VTuple(11); + tuple1.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar('9'), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1") + }); + + tuple2 = new VTuple(11); + tuple2.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar('9'), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1") + }); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testFrameTuple() { + Tuple frame = new FrameTuple(tuple1, tuple2); + assertEquals(22, frame.size()); + for (int i = 0; i < 22; i++) { + assertTrue(frame.contains(i)); + } + + assertEquals(DatumFactory.createInt8(23l), frame.get(5)); + assertEquals(DatumFactory.createInt8(23l), frame.get(16)); + assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10)); + assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java new file mode 100644 index 0000000..c6149f7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.BytesUtils; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestLazyTuple { + + Schema schema; + byte[][] textRow; + byte[] nullbytes; + SerializerDeserializer serde; + + @Before + public void setUp() { + nullbytes = "\\N".getBytes(); + + schema = new Schema(); + schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN); + schema.addColumn("col2", TajoDataTypes.Type.BIT); + schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7); + schema.addColumn("col4", TajoDataTypes.Type.INT2); + schema.addColumn("col5", TajoDataTypes.Type.INT4); + schema.addColumn("col6", TajoDataTypes.Type.INT8); + schema.addColumn("col7", TajoDataTypes.Type.FLOAT4); + schema.addColumn("col8", TajoDataTypes.Type.FLOAT8); + schema.addColumn("col9", TajoDataTypes.Type.TEXT); + schema.addColumn("col10", TajoDataTypes.Type.BLOB); + schema.addColumn("col11", TajoDataTypes.Type.INET4); + schema.addColumn("col12", TajoDataTypes.Type.INT4); + schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE); + + StringBuilder sb = new StringBuilder(); + sb.append(DatumFactory.createBool(true)).append('|'); + sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|'); + sb.append(DatumFactory.createChar("str")).append('|'); + sb.append(DatumFactory.createInt2((short) 17)).append('|'); + sb.append(DatumFactory.createInt4(59)).append('|'); + sb.append(DatumFactory.createInt8(23l)).append('|'); + sb.append(DatumFactory.createFloat4(77.9f)).append('|'); + sb.append(DatumFactory.createFloat8(271.9f)).append('|'); + sb.append(DatumFactory.createText("str2")).append('|'); + sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|'); + sb.append(DatumFactory.createInet4("192.168.0.1")).append('|'); + sb.append(new String(nullbytes)).append('|'); + sb.append(NullDatum.get()); + textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|'); + serde = new TextSerializerDeserializer(); + } + + @Test + public void testGetDatum() { + + LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde); + assertEquals(DatumFactory.createBool(true), t1.get(0)); + assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1)); + assertEquals(DatumFactory.createChar("str"), t1.get(2)); + assertEquals(DatumFactory.createInt2((short) 17), t1.get(3)); + assertEquals(DatumFactory.createInt4(59), t1.get(4)); + assertEquals(DatumFactory.createInt8(23l), t1.get(5)); + assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6)); + assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7)); + assertEquals(DatumFactory.createText("str2"), t1.get(8)); + assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9)); + assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10)); + assertEquals(NullDatum.get(), t1.get(11)); + assertEquals(NullDatum.get(), t1.get(12)); + } + + @Test + public void testContain() { + int colNum = schema.size(); + + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + t1.put(0, DatumFactory.createInt4(1)); + t1.put(3, DatumFactory.createInt4(1)); + t1.put(7, DatumFactory.createInt4(1)); + + assertTrue(t1.contains(0)); + assertFalse(t1.contains(1)); + assertFalse(t1.contains(2)); + assertTrue(t1.contains(3)); + assertFalse(t1.contains(4)); + assertFalse(t1.contains(5)); + assertFalse(t1.contains(6)); + assertTrue(t1.contains(7)); + assertFalse(t1.contains(8)); + assertFalse(t1.contains(9)); + assertFalse(t1.contains(10)); + assertFalse(t1.contains(11)); + assertFalse(t1.contains(12)); + } + + @Test + public void testPut() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + t1.put(0, DatumFactory.createText("str")); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(11, DatumFactory.createFloat4(0.76f)); + + assertTrue(t1.contains(0)); + assertTrue(t1.contains(1)); + + assertEquals(t1.getText(0), "str"); + assertEquals(t1.get(1).asInt4(), 2); + assertTrue(t1.get(11).asFloat4() == 0.76f); + } + + @Test + public void testEquals() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + + assertEquals(t1, t2); + + Tuple t3 = new VTuple(colNum); + t3.put(0, DatumFactory.createInt4(1)); + t3.put(1, DatumFactory.createInt4(2)); + t3.put(3, DatumFactory.createInt4(2)); + assertEquals(t1, t3); + assertEquals(t2, t3); + + LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1); + assertNotSame(t1, t4); + } + + @Test + public void testHashCode() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("str")); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + t2.put(4, DatumFactory.createText("str")); + + assertEquals(t1.hashCode(), t2.hashCode()); + + Tuple t3 = new VTuple(colNum); + t3.put(0, DatumFactory.createInt4(1)); + t3.put(1, DatumFactory.createInt4(2)); + t3.put(3, DatumFactory.createInt4(2)); + t3.put(4, DatumFactory.createText("str")); + assertEquals(t1.hashCode(), t3.hashCode()); + assertEquals(t2.hashCode(), t3.hashCode()); + + Tuple t4 = new VTuple(5); + t4.put(0, DatumFactory.createInt4(1)); + t4.put(1, DatumFactory.createInt4(2)); + t4.put(4, DatumFactory.createInt4(2)); + + assertNotSame(t1.hashCode(), t4.hashCode()); + } + + @Test + public void testPutTuple() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(2, DatumFactory.createInt4(3)); + + + Schema schema2 = new Schema(); + schema2.addColumn("col1", TajoDataTypes.Type.INT8); + schema2.addColumn("col2", TajoDataTypes.Type.INT8); + + LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1); + t2.put(0, DatumFactory.createInt4(4)); + t2.put(1, DatumFactory.createInt4(5)); + + t1.put(3, t2); + + for (int i = 0; i < 5; i++) { + assertEquals(i + 1, t1.get(i).asInt4()); + } + } + + @Test + public void testInvalidNumber() { + byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|'); + Schema schema = new Schema(); + schema.addColumn("col1", TajoDataTypes.Type.INT2); + schema.addColumn("col2", TajoDataTypes.Type.INT4); + schema.addColumn("col3", TajoDataTypes.Type.INT8); + schema.addColumn("col4", TajoDataTypes.Type.FLOAT4); + schema.addColumn("col5", TajoDataTypes.Type.FLOAT8); + + LazyTuple tuple = new LazyTuple(schema, bytes, 0); + assertEquals(bytes.length, tuple.size()); + + for (int i = 0; i < tuple.size(); i++){ + assertEquals(NullDatum.get(), tuple.get(i)); + } + } + + @Test + public void testClone() throws CloneNotSupportedException { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("str")); + + LazyTuple t2 = (LazyTuple) t1.clone(); + assertNotSame(t1, t2); + assertEquals(t1, t2); + + assertSame(t1.get(4), t2.get(4)); + + t1.clear(); + assertFalse(t1.equals(t2)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java new file mode 100644 index 0000000..639ca04 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestTupleComparator { + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testCompare() { + Schema schema = new Schema(); + schema.addColumn("col1", Type.INT4); + schema.addColumn("col2", Type.INT4); + schema.addColumn("col3", Type.INT4); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.TEXT); + + Tuple tuple1 = new VTuple(5); + Tuple tuple2 = new VTuple(5); + + tuple1.put( + new Datum[] { + DatumFactory.createInt4(9), + DatumFactory.createInt4(3), + DatumFactory.createInt4(33), + DatumFactory.createInt4(4), + DatumFactory.createText("abc")}); + tuple2.put( + new Datum[] { + DatumFactory.createInt4(1), + DatumFactory.createInt4(25), + DatumFactory.createInt4(109), + DatumFactory.createInt4(4), + DatumFactory.createText("abd")}); + + SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false); + SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false); + + BaseTupleComparator tc = new BaseTupleComparator(schema, + new SortSpec[] {sortKey1, sortKey2}); + assertEquals(-1, tc.compare(tuple1, tuple2)); + assertEquals(1, tc.compare(tuple2, tuple1)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java new file mode 100644 index 0000000..1bbd9ec --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + + +import org.apache.tajo.datum.DatumFactory; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestVTuple { + + /** + * @throws Exception + */ + @Before + public void setUp() throws Exception { + + } + + @Test + public void testContain() { + VTuple t1 = new VTuple(260); + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(1)); + t1.put(27, DatumFactory.createInt4(1)); + t1.put(96, DatumFactory.createInt4(1)); + t1.put(257, DatumFactory.createInt4(1)); + + assertTrue(t1.contains(0)); + assertTrue(t1.contains(1)); + assertFalse(t1.contains(2)); + assertFalse(t1.contains(3)); + assertFalse(t1.contains(4)); + assertTrue(t1.contains(27)); + assertFalse(t1.contains(28)); + assertFalse(t1.contains(95)); + assertTrue(t1.contains(96)); + assertFalse(t1.contains(97)); + assertTrue(t1.contains(257)); + } + + @Test + public void testPut() { + VTuple t1 = new VTuple(260); + t1.put(0, DatumFactory.createText("str")); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(257, DatumFactory.createFloat4(0.76f)); + + assertTrue(t1.contains(0)); + assertTrue(t1.contains(1)); + + assertEquals(t1.getText(0),"str"); + assertEquals(t1.get(1).asInt4(),2); + assertTrue(t1.get(257).asFloat4() == 0.76f); + } + + @Test + public void testEquals() { + Tuple t1 = new VTuple(5); + Tuple t2 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + + assertEquals(t1,t2); + + Tuple t3 = new VTuple(5); + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(4, DatumFactory.createInt4(2)); + + assertNotSame(t1,t3); + } + + @Test + public void testHashCode() { + Tuple t1 = new VTuple(5); + Tuple t2 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("hyunsik")); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + t2.put(4, DatumFactory.createText("hyunsik")); + + assertEquals(t1.hashCode(),t2.hashCode()); + + Tuple t3 = new VTuple(5); + t3.put(0, DatumFactory.createInt4(1)); + t3.put(1, DatumFactory.createInt4(2)); + t3.put(4, DatumFactory.createInt4(2)); + + assertNotSame(t1.hashCode(),t3.hashCode()); + } + + @Test + public void testPutTuple() { + Tuple t1 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(2, DatumFactory.createInt4(3)); + + Tuple t2 = new VTuple(2); + t2.put(0, DatumFactory.createInt4(4)); + t2.put(1, DatumFactory.createInt4(5)); + + t1.put(3, t2); + + for (int i = 0; i < 5; i++) { + assertEquals(i+1, t1.get(i).asInt4()); + } + } + + @Test + public void testClone() throws CloneNotSupportedException { + Tuple t1 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("str")); + + VTuple t2 = (VTuple) t1.clone(); + assertNotSame(t1, t2); + assertEquals(t1, t2); + + assertSame(t1.get(4), t2.get(4)); + + t1.clear(); + assertFalse(t1.equals(t2)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml new file mode 100644 index 0000000..d1c561b --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -0,0 +1,164 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <property> + <name>fs.s3.impl</name> + <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value> + </property> + + <!-- Storage Manager Configuration --> + <property> + <name>tajo.storage.manager.hdfs.class</name> + <value>org.apache.tajo.storage.FileStorageManager</value> + </property> + <property> + <name>tajo.storage.manager.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + </property> + + <!--- Registered Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler</name> + <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + </property> + + <!--- Fragment Class Configurations --> + <property> + <name>tajo.storage.fragment.csv.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.raw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.rcfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.row.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.trevni.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.parquet.class</name> + <value>org.apache.tajo.storage.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.sequencefile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.avro.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + + <!--- Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.trevni.class</name> + <value>org.apache.tajo.storage.trevni.TrevniScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroScanner</value> + </property> + + <!--- Appender Handler --> + <property> + <name>tajo.storage.appender-handler</name> + <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + </property> + + <property> + <name>tajo.storage.appender-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.trevni.class</name> + <value>org.apache.tajo.storage.trevni.TrevniAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroAppender</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml new file mode 100644 index 0000000..e37149d --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/pom.xml @@ -0,0 +1,349 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright 2012 Database Lab., Korea Univ. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.9.1-SNAPSHOT</version> + <relativePath>../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tajo-storage-hbase</artifactId> + <packaging>jar</packaging> + <name>Tajo HBase Storage</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <tajo.test>TRUE</tajo.test> + </systemProperties> + <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>create-protobuf-generated-sources-directory</id> + <phase>initialize</phase> + <configuration> + <target> + <mkdir dir="target/generated-sources/proto" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <configuration> + <executable>protoc</executable> + <arguments> + <argument>-Isrc/main/proto/</argument> + <argument>--proto_path=../../tajo-common/src/main/proto</argument> + <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument> + <argument>--java_out=target/generated-sources/proto</argument> + <argument>src/main/proto/StorageFragmentProtos.proto</argument> + </arguments> + </configuration> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/proto</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </build> + + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-plan</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>jersey-json</artifactId> + <groupId>com.sun.jersey</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-server-tests</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-api</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-hs</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java new file mode 100644 index 0000000..8615235 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.TUtil; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An abstract class for HBase appender. + */ +public abstract class AbstractHBaseAppender implements Appender { + protected Configuration conf; + protected Schema schema; + protected TableMeta meta; + protected QueryUnitAttemptId taskAttemptId; + protected Path stagingDir; + protected boolean inited = false; + + protected ColumnMapping columnMapping; + protected TableStatistics stats; + protected boolean enabledStats; + + protected int columnNum; + + protected byte[][][] mappingColumnFamilies; + protected boolean[] isBinaryColumns; + protected boolean[] isRowKeyMappings; + protected boolean[] isColumnKeys; + protected boolean[] isColumnValues; + protected int[] rowKeyFieldIndexes; + protected int[] rowkeyColumnIndexes; + protected char rowKeyDelimiter; + + // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping + protected int[] columnKeyValueDataIndexes; + protected byte[][] columnKeyDatas; + protected byte[][] columnValueDatas; + protected byte[][] columnKeyCfNames; + + protected KeyValue[] keyValues; + + public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path stagingDir) { + this.conf = conf; + this.schema = schema; + this.meta = meta; + this.stagingDir = stagingDir; + this.taskAttemptId = taskAttemptId; + } + + @Override + public void init() throws IOException { + if (inited) { + throw new IllegalStateException("FileAppender is already initialized."); + } + inited = true; + if (enabledStats) { + stats = new TableStatistics(this.schema); + } + columnMapping = new ColumnMapping(schema, meta); + + mappingColumnFamilies = columnMapping.getMappingColumns(); + + isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>(); + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + rowkeyColumnIndexList.add(i); + } + } + rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList); + + isBinaryColumns = columnMapping.getIsBinaryColumns(); + isColumnKeys = columnMapping.getIsColumnKeys(); + isColumnValues = columnMapping.getIsColumnValues(); + rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); + rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); + + this.columnNum = schema.size(); + + // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value + // which are mapped to the same column family. + columnKeyValueDataIndexes = new int[isColumnKeys.length]; + int index = 0; + int numKeyValues = 0; + Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>(); + for (int i = 0; i < isColumnKeys.length; i++) { + if (isRowKeyMappings[i]) { + continue; + } + if (isColumnKeys[i] || isColumnValues[i]) { + String cfName = new String(mappingColumnFamilies[i][0]); + if (!cfNameIndexMap.containsKey(cfName)) { + cfNameIndexMap.put(cfName, index); + columnKeyValueDataIndexes[i] = index; + index++; + numKeyValues++; + } else { + columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName); + } + } else { + numKeyValues++; + } + } + columnKeyCfNames = new byte[cfNameIndexMap.size()][]; + for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) { + columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes(); + } + columnKeyDatas = new byte[cfNameIndexMap.size()][]; + columnValueDatas = new byte[cfNameIndexMap.size()][]; + + keyValues = new KeyValue[numKeyValues]; + } + + private ByteArrayOutputStream bout = new ByteArrayOutputStream(); + + protected byte[] getRowKeyBytes(Tuple tuple) throws IOException { + Datum datum; + byte[] rowkey; + if (rowkeyColumnIndexes.length > 1) { + bout.reset(); + for (int i = 0; i < rowkeyColumnIndexes.length; i++) { + datum = tuple.get(rowkeyColumnIndexes[i]); + if (isBinaryColumns[rowkeyColumnIndexes[i]]) { + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + } else { + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + } + bout.write(rowkey); + if (i < rowkeyColumnIndexes.length - 1) { + bout.write(rowKeyDelimiter); + } + } + rowkey = bout.toByteArray(); + } else { + int index = rowkeyColumnIndexes[0]; + datum = tuple.get(index); + if (isBinaryColumns[index]) { + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum); + } else { + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum); + } + } + + return rowkey; + } + + protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException { + int keyValIndex = 0; + for (int i = 0; i < columnNum; i++) { + if (isRowKeyMappings[i]) { + continue; + } + Datum datum = tuple.get(i); + byte[] value; + if (isBinaryColumns[i]) { + value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); + } else { + value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); + } + + if (isColumnKeys[i]) { + columnKeyDatas[columnKeyValueDataIndexes[i]] = value; + } else if (isColumnValues[i]) { + columnValueDatas[columnKeyValueDataIndexes[i]] = value; + } else { + keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value); + keyValIndex++; + } + } + + for (int i = 0; i < columnKeyDatas.length; i++) { + keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]); + } + } + + @Override + public void enableStats() { + enabledStats = true; + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java new file mode 100644 index 0000000..79161cc --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.logical.SortNode; +import org.apache.tajo.plan.logical.SortNode.SortPurpose; +import org.apache.tajo.plan.logical.UnaryNode; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.util.PlannerUtil; + +public class AddSortForInsertRewriter implements RewriteRule { + private int[] sortColumnIndexes; + private Column[] sortColumns; + public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) { + this.sortColumns = sortColumns; + this.sortColumnIndexes = new int[sortColumns.length]; + + Schema tableSchema = tableDesc.getSchema(); + for (int i = 0; i < sortColumns.length; i++) { + sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName()); + } + } + + @Override + public String getName() { + return "AddSortForInsertRewriter"; + } + + @Override + public boolean isEligible(LogicalPlan plan) { + StoreType storeType = PlannerUtil.getStoreType(plan); + return storeType != null; + } + + @Override + public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + UnaryNode insertNode = rootNode.getChild(); + LogicalNode childNode = insertNode.getChild(); + + Schema sortSchema = childNode.getOutSchema(); + SortNode sortNode = plan.createNode(SortNode.class); + sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED); + sortNode.setInSchema(sortSchema); + sortNode.setOutSchema(sortSchema); + + SortSpec[] sortSpecs = new SortSpec[sortColumns.length]; + int index = 0; + + for (int i = 0; i < sortColumnIndexes.length; i++) { + Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]); + if (sortColumn == null) { + throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]); + } + sortSpecs[index++] = new SortSpec(sortColumn, true, true); + } + sortNode.setSortSpecs(sortSpecs); + + sortNode.setChild(insertNode.getChild()); + insertNode.setChild(sortNode); + plan.getRootBlock().registerNode(sortNode); + + return plan; + } +}
