Repository: tajo Updated Branches: refs/heads/master 68ecc0985 -> 2056aaa81
TAJO-1742: Remove hadoop dependency in DatumFactory. Closes #676 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2056aaa8 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2056aaa8 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2056aaa8 Branch: refs/heads/master Commit: 2056aaa8111cf2ef81fc4c71fd7c085a59a043bd Parents: 68ecc09 Author: Jinho Kim <[email protected]> Authored: Wed Aug 5 16:19:12 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Wed Aug 5 16:19:12 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/datum/DatumFactory.java | 80 +-- .../java/org/apache/tajo/util/BytesUtils.java | 19 +- .../java/org/apache/tajo/util/NumberUtil.java | 565 +++++++++++++++++++ 4 files changed, 623 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2056aaa8/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0d35d8a..3757f7f 100644 --- a/CHANGES +++ b/CHANGES @@ -210,6 +210,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1742: Remove hadoop dependency in DatumFactory. (jinho) + TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho) TAJO-1731: With a task failure, query processing is hanged after first retry. http://git-wip-us.apache.org/repos/asf/tajo/blob/2056aaa8/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index 480582a..7a042fa 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -23,7 +23,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.InvalidCastException; -import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.DateTimeFormat; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -114,45 +114,45 @@ public class DatumFactory { public static Datum createFromBytes(DataType dataType, byte[] bytes) { switch (dataType.getType()) { - case BOOLEAN: - return createBool(bytes[0]); - case INT2: - return createInt2(Bytes.toShort(bytes)); - case INT4: - return createInt4(Bytes.toInt(bytes)); - case INT8: - return createInt8(Bytes.toLong(bytes)); - case FLOAT4: - return createFloat4(Bytes.toFloat(bytes)); - case FLOAT8: - return createFloat8(Bytes.toDouble(bytes)); - case CHAR: - return createChar(bytes); - case TEXT: - return createText(bytes); - case DATE: - return new DateDatum(Bytes.toInt(bytes)); - case TIME: - return new TimeDatum(Bytes.toLong(bytes)); - case TIMESTAMP: - return new TimestampDatum(Bytes.toLong(bytes)); - case BIT: - return createBit(bytes[0]); - case BLOB: - return createBlob(bytes); - case INET4: - return createInet4(bytes); - case PROTOBUF: - ProtobufDatumFactory factory = ProtobufDatumFactory.get(dataType); - Message.Builder builder = factory.newBuilder(); - try { - builder.mergeFrom(bytes); - return factory.createDatum(builder.build()); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - default: + case BOOLEAN: + return createBool(bytes[0]); + case INT2: + return createInt2(NumberUtil.toShort(bytes)); + case INT4: + return createInt4(NumberUtil.toInt(bytes)); + case INT8: + return createInt8(NumberUtil.toLong(bytes)); + case FLOAT4: + return createFloat4(NumberUtil.toFloat(bytes)); + case FLOAT8: + return createFloat8(NumberUtil.toDouble(bytes)); + case CHAR: + return createChar(bytes); + case TEXT: + return createText(bytes); + case DATE: + return new DateDatum(NumberUtil.toInt(bytes)); + case TIME: + return new TimeDatum(NumberUtil.toLong(bytes)); + case TIMESTAMP: + return new TimestampDatum(NumberUtil.toLong(bytes)); + case BIT: + return createBit(bytes[0]); + case BLOB: + return createBlob(bytes); + case INET4: + return createInet4(bytes); + case PROTOBUF: + ProtobufDatumFactory factory = ProtobufDatumFactory.get(dataType); + Message.Builder builder = factory.newBuilder(); + try { + builder.mergeFrom(bytes); + return factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + default: throw new UnsupportedOperationException(dataType.toString()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2056aaa8/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java index 725301c..5df924b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java @@ -18,8 +18,6 @@ package org.apache.tajo.util; -import org.apache.hadoop.io.WritableUtils; - import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +27,21 @@ import java.util.List; * Extra utilities for bytes */ public class BytesUtils { + + /** + * Parse the first byte of a vint/vlong to determine the number of bytes + * @param value the first byte of the vint/vlong + * @return the total number of bytes (1 to 9) + */ + public static int decodeVIntSize(byte value) { + if (value >= -112) { + return 1; + } else if (value < -120) { + return -119 - value; + } + return -111 - value; + } + /** * @param n Long to make a VLong of. * @return VLong as bytes array. @@ -54,7 +67,7 @@ public class BytesUtils { len--; } - int size = WritableUtils.decodeVIntSize((byte) len); + int size = decodeVIntSize((byte) len); result = new byte[size]; result[offset++] = (byte) len; http://git-wip-us.apache.org/repos/asf/tajo/blob/2056aaa8/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java index 32e086c..51dd565 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java @@ -24,6 +24,9 @@ import io.netty.util.internal.PlatformDependent; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Constructor; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; import java.nio.charset.Charset; // this is an implementation copied from LazyPrimitives in hive @@ -46,6 +49,52 @@ public class NumberUtil { * no need to worry about additional digits. */ + /** When we encode strings, we always specify UTF8 encoding */ + public static final String UTF8_ENCODING = "UTF-8"; + + /** When we encode strings, we always specify UTF8 encoding */ + public static final Charset UTF8_CHARSET = Charset.forName(UTF8_ENCODING); + + /** + * Size of boolean in bytes + */ + public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE; + + /** + * Size of byte in bytes + */ + public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN; + + /** + * Size of char in bytes + */ + public static final int SIZEOF_CHAR = Character.SIZE / Byte.SIZE; + + /** + * Size of double in bytes + */ + public static final int SIZEOF_DOUBLE = Double.SIZE / Byte.SIZE; + + /** + * Size of float in bytes + */ + public static final int SIZEOF_FLOAT = Float.SIZE / Byte.SIZE; + + /** + * Size of int in bytes + */ + public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE; + + /** + * Size of long in bytes + */ + public static final int SIZEOF_LONG = Long.SIZE / Byte.SIZE; + + /** + * Size of short in bytes + */ + public static final int SIZEOF_SHORT = Short.SIZE / Byte.SIZE; + public static long unsigned32(int n) { return n & 0xFFFFFFFFL; } @@ -1054,4 +1103,520 @@ public class NumberUtil { public static int compare(long x, long y) { return (x < y) ? -1 : ((x == y) ? 0 : 1); } + + /** + * Put bytes at the specified byte array position. + * @param tgtBytes the byte array + * @param tgtOffset position in the array + * @param srcBytes array to write out + * @param srcOffset source offset + * @param srcLength source length + * @return incremented offset + */ + public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes, + int srcOffset, int srcLength) { + System.arraycopy(srcBytes, srcOffset, tgtBytes, tgtOffset, srcLength); + return tgtOffset + srcLength; + } + + /** + * Returns a new byte array, copied from the given {@code buf}, + * from the index 0 (inclusive) to the limit (exclusive), + * regardless of the current position. + * The position and the other index parameters are not changed. + * + * @param buf a byte buffer + * @return the byte array + * @see #getBytes(ByteBuffer) + */ + public static byte[] toBytes(ByteBuffer buf) { + ByteBuffer dup = buf.duplicate(); + dup.position(0); + return readBytes(dup); + } + + private static byte[] readBytes(ByteBuffer buf) { + byte [] result = new byte[buf.remaining()]; + buf.get(result); + return result; + } + + /** + * Converts a string to a UTF-8 byte array. + * @param s string + * @return the byte array + */ + public static byte[] toBytes(String s) { + return s.getBytes(UTF8_CHARSET); + } + + /** + * Convert a boolean to a byte array. True becomes -1 + * and false becomes 0. + * + * @param b value + * @return <code>b</code> encoded in a byte array. + */ + public static byte [] toBytes(final boolean b) { + return new byte[] { b ? (byte) -1 : (byte) 0 }; + } + + /** + * Reverses {@link #toBytes(boolean)} + * @param b array + * @return True or false. + */ + public static boolean toBoolean(final byte [] b) { + if (b.length != 1) { + throw new IllegalArgumentException("Array has wrong size: " + b.length); + } + return b[0] != (byte) 0; + } + + /** + * Convert a long value to a byte array using big-endian. + * + * @param val value to convert + * @return the byte array + */ + public static byte[] toBytes(long val) { + byte [] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + return b; + } + + /** + * Converts a byte array to a long value. Reverses + * {@link #toBytes(long)} + * @param bytes array + * @return the long value + */ + public static long toLong(byte[] bytes) { + return toLong(bytes, 0, SIZEOF_LONG); + } + + /** + * Converts a byte array to a long value. Assumes there will be + * {@link #SIZEOF_LONG} bytes available. + * + * @param bytes bytes + * @param offset offset + * @return the long value + */ + public static long toLong(byte[] bytes, int offset) { + return toLong(bytes, offset, SIZEOF_LONG); + } + + /** + * Converts a byte array to a long value. + * + * @param bytes array of bytes + * @param offset offset into array + * @param length length of data (must be {@link #SIZEOF_LONG}) + * @return the long value + * @throws IllegalArgumentException if length is not {@link #SIZEOF_LONG} or + * if there's not enough room in the array at the offset indicated. + */ + public static long toLong(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_LONG || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG); + } + long l = 0; + for(int i = offset; i < offset + length; i++) { + l <<= 8; + l ^= bytes[i] & 0xFF; + } + return l; + } + + private static IllegalArgumentException + explainWrongLengthOrOffset(final byte[] bytes, + final int offset, + final int length, + final int expectedLength) { + String reason; + if (length != expectedLength) { + reason = "Wrong length: " + length + ", expected " + expectedLength; + } else { + reason = "offset (" + offset + ") + length (" + length + ") exceed the" + + " capacity of the array: " + bytes.length; + } + return new IllegalArgumentException(reason); + } + + /** + * Put a long value out to the specified byte array position. + * @param bytes the byte array + * @param offset position in the array + * @param val long to write out + * @return incremented offset + * @throws IllegalArgumentException if the byte array given doesn't have + * enough room at the offset specified. + */ + public static int putLong(byte[] bytes, int offset, long val) { + if (bytes.length - offset < SIZEOF_LONG) { + throw new IllegalArgumentException("Not enough room to put a long at" + + " offset " + offset + " in a " + bytes.length + " byte array"); + } + for(int i = offset + 7; i > offset; i--) { + bytes[i] = (byte) val; + val >>>= 8; + } + bytes[offset] = (byte) val; + return offset + SIZEOF_LONG; + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format" + * @param bytes byte array + * @return Float made from passed byte array. + */ + public static float toFloat(byte [] bytes) { + return toFloat(bytes, 0); + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format" + * @param bytes array to convert + * @param offset offset into array + * @return Float made from passed byte array. + */ + public static float toFloat(byte [] bytes, int offset) { + return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT)); + } + + /** + * @param bytes byte array + * @param offset offset to write to + * @param f float value + * @return New offset in <code>bytes</code> + */ + public static int putFloat(byte [] bytes, int offset, float f) { + return putInt(bytes, offset, Float.floatToRawIntBits(f)); + } + + /** + * @param f float value + * @return the float represented as byte [] + */ + public static byte [] toBytes(final float f) { + // Encode it as int + return Bytes.toBytes(Float.floatToRawIntBits(f)); + } + + /** + * @param bytes byte array + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte [] bytes) { + return toDouble(bytes, 0); + } + + /** + * @param bytes byte array + * @param offset offset where double is + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte [] bytes, final int offset) { + return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_LONG)); + } + + /** + * @param bytes byte array + * @param offset offset to write to + * @param d value + * @return New offset into array <code>bytes</code> + */ + public static int putDouble(byte [] bytes, int offset, double d) { + return putLong(bytes, offset, Double.doubleToLongBits(d)); + } + + /** + * Serialize a double as the IEEE 754 double format output. The resultant + * array will be 8 bytes long. + * + * @param d value + * @return the double represented as byte [] + */ + public static byte [] toBytes(final double d) { + // Encode it as a long + return Bytes.toBytes(Double.doubleToRawLongBits(d)); + } + + /** + * Convert an int value to a byte array. Big-endian. Same as what DataOutputStream.writeInt + * does. + * + * @param val value + * @return the byte array + */ + public static byte[] toBytes(int val) { + byte [] b = new byte[4]; + for(int i = 3; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + return b; + } + + /** + * Converts a byte array to an int value + * @param bytes byte array + * @return the int value + */ + public static int toInt(byte[] bytes) { + return toInt(bytes, 0, SIZEOF_INT); + } + + /** + * Converts a byte array to an int value + * @param bytes byte array + * @param offset offset into array + * @return the int value + */ + public static int toInt(byte[] bytes, int offset) { + return toInt(bytes, offset, SIZEOF_INT); + } + + /** + * Converts a byte array to an int value + * @param bytes byte array + * @param offset offset into array + * @param length length of int (has to be {@link #SIZEOF_INT}) + * @return the int value + * @throws IllegalArgumentException if length is not {@link #SIZEOF_INT} or + * if there's not enough room in the array at the offset indicated. + */ + public static int toInt(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_INT || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT); + } + int n = 0; + for(int i = offset; i < (offset + length); i++) { + n <<= 8; + n ^= bytes[i] & 0xFF; + } + return n; + } + + /** + * Converts a byte array to an int value + * @param bytes byte array + * @param offset offset into array + * @param length how many bytes should be considered for creating int + * @return the int value + * @throws IllegalArgumentException if there's not enough room in the array at the offset + * indicated. + */ + public static int readAsInt(byte[] bytes, int offset, final int length) { + if (offset + length > bytes.length) { + throw new IllegalArgumentException("offset (" + offset + ") + length (" + length + + ") exceed the" + " capacity of the array: " + bytes.length); + } + int n = 0; + for(int i = offset; i < (offset + length); i++) { + n <<= 8; + n ^= bytes[i] & 0xFF; + } + return n; + } + + /** + * Put an int value out to the specified byte array position. + * @param bytes the byte array + * @param offset position in the array + * @param val int to write out + * @return incremented offset + * @throws IllegalArgumentException if the byte array given doesn't have + * enough room at the offset specified. + */ + public static int putInt(byte[] bytes, int offset, int val) { + if (bytes.length - offset < SIZEOF_INT) { + throw new IllegalArgumentException("Not enough room to put an int at" + + " offset " + offset + " in a " + bytes.length + " byte array"); + } + for(int i= offset + 3; i > offset; i--) { + bytes[i] = (byte) val; + val >>>= 8; + } + bytes[offset] = (byte) val; + return offset + SIZEOF_INT; + } + + /** + * Convert a short value to a byte array of {@link #SIZEOF_SHORT} bytes long. + * @param val value + * @return the byte array + */ + public static byte[] toBytes(short val) { + byte[] b = new byte[SIZEOF_SHORT]; + b[1] = (byte) val; + val >>= 8; + b[0] = (byte) val; + return b; + } + + /** + * Converts a byte array to a short value + * @param bytes byte array + * @return the short value + */ + public static short toShort(byte[] bytes) { + return toShort(bytes, 0, SIZEOF_SHORT); + } + + /** + * Converts a byte array to a short value + * @param bytes byte array + * @param offset offset into array + * @return the short value + */ + public static short toShort(byte[] bytes, int offset) { + return toShort(bytes, offset, SIZEOF_SHORT); + } + + /** + * Converts a byte array to a short value + * @param bytes byte array + * @param offset offset into array + * @param length length, has to be {@link #SIZEOF_SHORT} + * @return the short value + * @throws IllegalArgumentException if length is not {@link #SIZEOF_SHORT} + * or if there's not enough room in the array at the offset indicated. + */ + public static short toShort(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_SHORT || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_SHORT); + } + short n = 0; + n ^= bytes[offset] & 0xFF; + n <<= 8; + n ^= bytes[offset+1] & 0xFF; + return n; + } + + /** + * Returns a new byte array, copied from the given {@code buf}, + * from the position (inclusive) to the limit (exclusive). + * The position and the other index parameters are not changed. + * + * @param buf a byte buffer + * @return the byte array + * @see #toBytes(ByteBuffer) + */ + public static byte[] getBytes(ByteBuffer buf) { + return readBytes(buf.duplicate()); + } + + /** + * Put a short value out to the specified byte array position. + * @param bytes the byte array + * @param offset position in the array + * @param val short to write out + * @return incremented offset + * @throws IllegalArgumentException if the byte array given doesn't have + * enough room at the offset specified. + */ + public static int putShort(byte[] bytes, int offset, short val) { + if (bytes.length - offset < SIZEOF_SHORT) { + throw new IllegalArgumentException("Not enough room to put a short at" + + " offset " + offset + " in a " + bytes.length + " byte array"); + } + bytes[offset+1] = (byte) val; + val >>= 8; + bytes[offset] = (byte) val; + return offset + SIZEOF_SHORT; + } + + /** + * Put an int value as short out to the specified byte array position. Only the lower 2 bytes of + * the short will be put into the array. The caller of the API need to make sure they will not + * loose the value by doing so. This is useful to store an unsigned short which is represented as + * int in other parts. + * @param bytes the byte array + * @param offset position in the array + * @param val value to write out + * @return incremented offset + * @throws IllegalArgumentException if the byte array given doesn't have + * enough room at the offset specified. + */ + public static int putAsShort(byte[] bytes, int offset, int val) { + if (bytes.length - offset < SIZEOF_SHORT) { + throw new IllegalArgumentException("Not enough room to put a short at" + + " offset " + offset + " in a " + bytes.length + " byte array"); + } + bytes[offset+1] = (byte) val; + val >>= 8; + bytes[offset] = (byte) val; + return offset + SIZEOF_SHORT; + } + + /** + * Convert a BigDecimal value to a byte array + * + * @param val + * @return the byte array + */ + public static byte[] toBytes(BigDecimal val) { + byte[] valueBytes = val.unscaledValue().toByteArray(); + byte[] result = new byte[valueBytes.length + SIZEOF_INT]; + int offset = putInt(result, 0, val.scale()); + putBytes(result, offset, valueBytes, 0, valueBytes.length); + return result; + } + + + /** + * Converts a byte array to a BigDecimal + * + * @param bytes + * @return the char value + */ + public static BigDecimal toBigDecimal(byte[] bytes) { + return toBigDecimal(bytes, 0, bytes.length); + } + + /** + * Converts a byte array to a BigDecimal value + * + * @param bytes + * @param offset + * @param length + * @return the char value + */ + public static BigDecimal toBigDecimal(byte[] bytes, int offset, final int length) { + if (bytes == null || length < SIZEOF_INT + 1 || + (offset + length > bytes.length)) { + return null; + } + + int scale = toInt(bytes, offset); + byte[] tcBytes = new byte[length - SIZEOF_INT]; + System.arraycopy(bytes, offset + SIZEOF_INT, tcBytes, 0, length - SIZEOF_INT); + return new BigDecimal(new BigInteger(tcBytes), scale); + } + + /** + * Put a BigDecimal value out to the specified byte array position. + * + * @param bytes the byte array + * @param offset position in the array + * @param val BigDecimal to write out + * @return incremented offset + */ + public static int putBigDecimal(byte[] bytes, int offset, BigDecimal val) { + if (bytes == null) { + return offset; + } + + byte[] valueBytes = val.unscaledValue().toByteArray(); + byte[] result = new byte[valueBytes.length + SIZEOF_INT]; + offset = putInt(result, offset, val.scale()); + return putBytes(result, offset, valueBytes, 0, valueBytes.length); + } }
