http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java index cba9ee0..b25b2fd 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java @@ -110,8 +110,8 @@ public class BaseTupleComparator extends TupleComparator implements ProtoObject< @Override public int compare(Tuple tuple1, Tuple tuple2) { for (int i = 0; i < sortKeyIds.length; i++) { - left = tuple1.get(sortKeyIds[i]); - right = tuple2.get(sortKeyIds[i]); + left = tuple1.asDatum(sortKeyIds[i]); + right = tuple2.asDatum(sortKeyIds[i]); if (left.isNull() || right.isNull()) { if (!left.equals(right)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java index a3b8da8..2cccb69 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java @@ -21,6 +21,7 @@ package org.apache.tajo.storage; import com.google.common.base.Preconditions; import com.google.protobuf.Message; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.*; import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; import org.apache.tajo.util.Bytes; @@ -33,50 +34,58 @@ public class BinarySerializerDeserializer implements SerializerDeserializer { static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)}; + private Schema schema; + @Override - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) + public void init(Schema schema) { + this.schema = schema; + } + + @Override + public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters) throws IOException { - byte[] bytes; - int length = 0; - if (datum == null || datum instanceof NullDatum) { + if (tuple.isBlankOrNull(index)) { return 0; } - switch (col.getDataType().getType()) { + byte[] bytes; + int length = 0; + Column column = schema.getColumn(index); + switch (column.getDataType().getType()) { case BOOLEAN: case BIT: - bytes = datum.asByteArray(); + bytes = tuple.getBytes(index); length = bytes.length; out.write(bytes, 0, length); break; case CHAR: - bytes = datum.asByteArray(); + bytes = tuple.getBytes(index); length = bytes.length; - if (length > col.getDataType().getLength()) { - throw new ValueTooLongForTypeCharactersException(col.getDataType().getLength()); + if (length > column.getDataType().getLength()) { + throw new ValueTooLongForTypeCharactersException(column.getDataType().getLength()); } out.write(bytes, 0, length); break; case INT2: - length = writeShort(out, datum.asInt2()); + length = writeShort(out, tuple.getInt2(index)); break; case INT4: - length = writeVLong(out, datum.asInt4()); + length = writeVLong(out, tuple.getInt4(index)); break; case INT8: - length = writeVLong(out, datum.asInt8()); + length = writeVLong(out, tuple.getInt8(index)); break; case FLOAT4: - length = writeFloat(out, datum.asFloat4()); + length = writeFloat(out, tuple.getFloat4(index)); break; case FLOAT8: - length = writeDouble(out, datum.asFloat8()); + length = writeDouble(out, tuple.getFloat8(index)); break; case TEXT: { - bytes = datum.asTextBytes(); - length = datum.size(); + bytes = tuple.getTextBytes(index); + length = bytes.length; if (length == 0) { bytes = INVALID_UTF__SINGLE_BYTE; length = INVALID_UTF__SINGLE_BYTE.length; @@ -87,12 +96,12 @@ public class BinarySerializerDeserializer implements SerializerDeserializer { case BLOB: case INET4: case INET6: - bytes = datum.asByteArray(); + bytes = tuple.getBytes(index); length = bytes.length; out.write(bytes, 0, length); break; case PROTOBUF: - ProtobufDatum protobufDatum = (ProtobufDatum) datum; + ProtobufDatum protobufDatum = (ProtobufDatum) tuple.getProtobufDatum(index); bytes = protobufDatum.asByteArray(); length = bytes.length; out.write(bytes, 0, length); @@ -106,11 +115,12 @@ public class BinarySerializerDeserializer implements SerializerDeserializer { } @Override - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { if (length == 0) return NullDatum.get(); Datum datum; - switch (col.getDataType().getType()) { + Column column = schema.getColumn(index); + switch (column.getDataType().getType()) { case BOOLEAN: datum = DatumFactory.createBool(bytes[offset]); break; @@ -150,7 +160,7 @@ public class BinarySerializerDeserializer implements SerializerDeserializer { break; } case PROTOBUF: { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode()); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(column.getDataType().getCode()); Message.Builder builder = factory.newBuilder(); builder.mergeFrom(bytes, offset, length); datum = factory.createDatum(builder); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java index a5561ed..ed53832 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -22,10 +22,12 @@ package org.apache.tajo.storage; import com.google.common.base.Preconditions; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.util.datetime.TimeMeta; /** * An instance of FrameTuple is an immutable tuple. @@ -82,13 +84,19 @@ public class FrameTuple implements Tuple, Cloneable { } @Override - public boolean isNull(int fieldid) { - return get(fieldid).isNull(); + public boolean isBlank(int fieldid) { + return asDatum(fieldid) == null; } @Override - public boolean isNotNull(int fieldid) { - return !isNull(fieldid); + public boolean isBlankOrNull(int fieldid) { + Datum datum = asDatum(fieldid); + return datum == null || datum.isNull(); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException(); } @Override @@ -102,13 +110,18 @@ public class FrameTuple implements Tuple, Cloneable { } @Override - public void put(int fieldId, Datum[] values) { + public void put(Datum[] values) { throw new UnsupportedException(); } @Override - public void put(int fieldId, Tuple tuple) { - throw new UnsupportedException(); + public TajoDataTypes.Type type(int fieldId) { + return null; + } + + @Override + public int size(int fieldId) { + return 0; } @Override @@ -122,85 +135,90 @@ public class FrameTuple implements Tuple, Cloneable { } @Override - public void put(Datum [] values) { - throw new UnsupportedException(); - } - - @Override - public Datum get(int fieldId) { + public Datum asDatum(int fieldId) { Preconditions.checkArgument(fieldId < size, "Out of field access: " + fieldId); if (fieldId < leftSize) { - return left.get(fieldId); + return left.asDatum(fieldId); } else { - return right.get(fieldId - leftSize); + return right.asDatum(fieldId - leftSize); } } @Override public boolean getBool(int fieldId) { - return get(fieldId).asBool(); + return asDatum(fieldId).asBool(); } @Override public byte getByte(int fieldId) { - return get(fieldId).asByte(); + return asDatum(fieldId).asByte(); } @Override public char getChar(int fieldId) { - return get(fieldId).asChar(); + return asDatum(fieldId).asChar(); } @Override public byte [] getBytes(int fieldId) { - return get(fieldId).asByteArray(); + return asDatum(fieldId).asByteArray(); + } + + @Override + public byte[] getTextBytes(int fieldId) { + return asDatum(fieldId).asTextBytes(); } @Override public short getInt2(int fieldId) { - return get(fieldId).asInt2(); + return asDatum(fieldId).asInt2(); } @Override public int getInt4(int fieldId) { - return get(fieldId).asInt4(); + return asDatum(fieldId).asInt4(); } @Override public long getInt8(int fieldId) { - return get(fieldId).asInt8(); + return asDatum(fieldId).asInt8(); } @Override public float getFloat4(int fieldId) { - return get(fieldId).asFloat4(); + return asDatum(fieldId).asFloat4(); } @Override public double getFloat8(int fieldId) { - return get(fieldId).asFloat8(); + return asDatum(fieldId).asFloat8(); } @Override public String getText(int fieldId) { - return get(fieldId).asChars(); + return asDatum(fieldId).asChars(); + } + + @Override + public TimeMeta getTimeDate(int fieldId) { + return null; } @Override public ProtobufDatum getProtobufDatum(int fieldId) { - return (ProtobufDatum) get(fieldId); + return (ProtobufDatum) asDatum(fieldId); } @Override public IntervalDatum getInterval(int fieldId) { - return (IntervalDatum) get(fieldId); + return (IntervalDatum) asDatum(fieldId); } @Override public char [] getUnicodeChars(int fieldId) { - return get(fieldId).asUnicodeChars(); + return asDatum(fieldId).asUnicodeChars(); } @Override @@ -228,7 +246,7 @@ public class FrameTuple implements Tuple, Cloneable { } str.append(i) .append("=>") - .append(get(i)); + .append(asDatum(i)); } } str.append(")"); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java index bfbe478..7bfc166 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java @@ -19,11 +19,13 @@ package org.apache.tajo.storage; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.util.datetime.TimeMeta; import java.util.Arrays; @@ -36,7 +38,7 @@ public class LazyTuple implements Tuple, Cloneable { private SerializerDeserializer serializeDeserialize; public LazyTuple(Schema schema, byte[][] textBytes, long offset) { - this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer()); + this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer(schema)); } public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) { @@ -68,13 +70,19 @@ public class LazyTuple implements Tuple, Cloneable { } @Override - public boolean isNull(int fieldid) { - return get(fieldid).isNull(); + public boolean isBlank(int fieldid) { + return get(fieldid) == null; } @Override - public boolean isNotNull(int fieldid) { - return !isNull(fieldid); + public boolean isBlankOrNull(int fieldid) { + Datum datum = get(fieldid); + return datum == null || datum.isNull(); + } + + @Override + public void put(int fieldId, Tuple tuple) { + this.put(fieldId, tuple.asDatum(fieldId)); } @Override @@ -95,31 +103,28 @@ public class LazyTuple implements Tuple, Cloneable { } @Override - public void put(int fieldId, Datum[] values) { - for (int i = fieldId, j = 0; j < values.length; i++, j++) { - this.values[i] = values[j]; - } - this.textBytes = new byte[values.length][]; + public void put(Datum[] values) { + System.arraycopy(values, 0, this.values, 0, this.values.length); } @Override - public void put(int fieldId, Tuple tuple) { - for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) { - values[i] = tuple.get(j); - textBytes[i] = null; - } + public Datum asDatum(int fieldId) { + return get(fieldId); } @Override - public void put(Datum[] values) { - System.arraycopy(values, 0, this.values, 0, size()); - this.textBytes = new byte[values.length][]; + public TajoDataTypes.Type type(int fieldId) { + return get(fieldId).type(); + } + + @Override + public int size(int fieldId) { + return get(fieldId).size(); } ////////////////////////////////////////////////////// // Getter ////////////////////////////////////////////////////// - @Override public Datum get(int fieldId) { if (values[fieldId] != null) return values[fieldId]; @@ -127,7 +132,7 @@ public class LazyTuple implements Tuple, Cloneable { values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,") } else if (textBytes[fieldId] != null) { try { - values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId), + values[fieldId] = serializeDeserialize.deserialize(fieldId, textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes); } catch (Exception e) { values[fieldId] = NullDatum.get(); @@ -170,6 +175,11 @@ public class LazyTuple implements Tuple, Cloneable { } @Override + public byte[] getTextBytes(int fieldId) { + return get(fieldId).asTextBytes(); + } + + @Override public short getInt2(int fieldId) { return get(fieldId).asInt2(); } @@ -200,6 +210,11 @@ public class LazyTuple implements Tuple, Cloneable { } @Override + public TimeMeta getTimeDate(int fieldId) { + return get(fieldId).asTimeMeta(); + } + + @Override public ProtobufDatum getProtobufDatum(int fieldId) { throw new UnsupportedException(); } @@ -214,7 +229,9 @@ public class LazyTuple implements Tuple, Cloneable { return get(fieldId).asUnicodeChars(); } + @Override public String toString() { + // todo this changes internal state, which causes funny result in GUI debugging boolean first = true; StringBuilder str = new StringBuilder(); str.append("("); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 256bc78..6643d45 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -47,7 +47,7 @@ public class RowStoreUtil { public static Tuple project(Tuple in, Tuple out, int[] targetIds) { out.clear(); for (int idx = 0; idx < targetIds.length; idx++) { - out.put(idx, in.get(targetIds[idx])); + out.put(idx, in.asDatum(targetIds[idx])); } return out; } @@ -76,7 +76,7 @@ public class RowStoreUtil { public Tuple toTuple(byte [] bytes) { nullFlags.clear(); ByteBuffer bb = ByteBuffer.wrap(bytes); - Tuple tuple = new VTuple(schema.size()); + VTuple tuple = new VTuple(schema.size()); Column col; TajoDataTypes.DataType type; @@ -189,7 +189,7 @@ public class RowStoreUtil { bb.position(headerSize); Column col; for (int i = 0; i < schema.size(); i++) { - if (tuple.isNull(i)) { + if (tuple.isBlankOrNull(i)) { nullFlags.set(i); continue; } @@ -200,15 +200,15 @@ public class RowStoreUtil { nullFlags.set(i); break; case BOOLEAN: - bb.put(tuple.get(i).asByte()); + bb.put(tuple.getByte(i)); break; case BIT: - bb.put(tuple.get(i).asByte()); + bb.put(tuple.getByte(i)); break; case CHAR: int charSize = col.getDataType().getLength(); byte [] _char = new byte[charSize]; - byte [] src = tuple.get(i).asByteArray(); + byte [] src = tuple.getBytes(i); if (charSize < src.length) { throw new ValueTooLongForTypeCharactersException(charSize); } @@ -217,48 +217,48 @@ public class RowStoreUtil { bb.put(_char); break; case INT2: - bb.putShort(tuple.get(i).asInt2()); + bb.putShort(tuple.getInt2(i)); break; case INT4: - bb.putInt(tuple.get(i).asInt4()); + bb.putInt(tuple.getInt4(i)); break; case INT8: - bb.putLong(tuple.get(i).asInt8()); + bb.putLong(tuple.getInt8(i)); break; case FLOAT4: - bb.putFloat(tuple.get(i).asFloat4()); + bb.putFloat(tuple.getFloat4(i)); break; case FLOAT8: - bb.putDouble(tuple.get(i).asFloat8()); + bb.putDouble(tuple.getFloat8(i)); break; case TEXT: - byte[] _string = tuple.get(i).asByteArray(); + byte[] _string = tuple.getBytes(i); bb.putInt(_string.length); bb.put(_string); break; case DATE: - bb.putInt(tuple.get(i).asInt4()); + bb.putInt(tuple.getInt4(i)); break; case TIME: case TIMESTAMP: - bb.putLong(tuple.get(i).asInt8()); + bb.putLong(tuple.getInt8(i)); break; case INTERVAL: - IntervalDatum interval = (IntervalDatum) tuple.get(i); + IntervalDatum interval = (IntervalDatum) tuple.getInterval(i); bb.putInt(interval.getMonths()); bb.putLong(interval.getMilliSeconds()); break; case BLOB: - byte[] bytes = tuple.get(i).asByteArray(); + byte[] bytes = tuple.getBytes(i); bb.putInt(bytes.length); bb.put(bytes); break; case INET4: - byte[] ipBytes = tuple.get(i).asByteArray(); + byte[] ipBytes = tuple.getBytes(i); bb.put(ipBytes); break; case INET6: - bb.put(tuple.get(i).asByteArray()); + bb.put(tuple.getBytes(i)); break; default: throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); @@ -283,7 +283,7 @@ public class RowStoreUtil { Column col; for (int i = 0; i < schema.size(); i++) { - if (tuple.isNull(i)) { + if (tuple.isBlankOrNull(i)) { continue; } @@ -315,11 +315,11 @@ public class RowStoreUtil { break; case TEXT: case BLOB: - size += (4 + tuple.get(i).asByteArray().length); + size += (4 + tuple.getBytes(i).length); break; case INET4: case INET6: - size += tuple.get(i).asByteArray().length; + size += tuple.getBytes(i).length; break; default: throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); @@ -340,7 +340,7 @@ public class RowStoreUtil { writer.startRow(); for (int i = 0; i < writer.dataTypes().length; i++) { - if (tuple.isNull(i)) { + if (tuple.isBlankOrNull(i)) { writer.skipField(); continue; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java index 564a9f5..44cd214 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.Datum; import java.io.IOException; @@ -27,8 +28,10 @@ import java.io.OutputStream; @Deprecated public interface SerializerDeserializer { - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException; + public void init(Schema schema); - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException; + public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters) throws IOException; + + public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java index a2c08de..c101b0b 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -20,13 +20,13 @@ package org.apache.tajo.storage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; /** * This class is not thread-safe. @@ -34,8 +34,8 @@ import org.apache.tajo.datum.NullDatum; public class TableStatistics { private static final Log LOG = LogFactory.getLog(TableStatistics.class); private Schema schema; - private Tuple minValues; - private Tuple maxValues; + private VTuple minValues; + private VTuple maxValues; private long [] numNulls; private long numRows = 0; private long numBytes = 0; @@ -81,12 +81,17 @@ public class TableStatistics { return this.numBytes; } - public void analyzeField(int idx, Datum datum) { - if (datum instanceof NullDatum) { + public void analyzeNull(int idx) { + numNulls[idx]++; + } + + public void analyzeField(int idx, Tuple tuple) { + if (tuple.isBlankOrNull(idx)) { numNulls[idx]++; return; } + Datum datum = tuple.asDatum(idx); if (comparable[idx]) { if (!maxValues.contains(idx) || maxValues.get(idx).compareTo(datum) < 0) { @@ -102,21 +107,21 @@ public class TableStatistics { public TableStats getTableStat() { TableStats stat = new TableStats(); - ColumnStats columnStats; for (int i = 0; i < schema.size(); i++) { - columnStats = new ColumnStats(schema.getColumn(i)); + Column column = schema.getColumn(i); + ColumnStats columnStats = new ColumnStats(column); columnStats.setNumNulls(numNulls[i]); - if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { + if (minValues.isBlank(i) || column.getDataType().getType() == minValues.type(i)) { columnStats.setMinValue(minValues.get(i)); } else { - LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); + LOG.warn("Wrong statistics column type (" + minValues.type(i) + + ", expected=" + column.getDataType().getType() + ")"); } - if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { + if (minValues.isBlank(i) || column.getDataType().getType() == maxValues.type(i)) { columnStats.setMaxValue(maxValues.get(i)); } else { - LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); + LOG.warn("Wrong statistics column type (" + maxValues.type(i) + + ", expected=" + column.getDataType().getType() + ")"); } stat.addColumnStat(columnStats); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java index 954b62d..1ec13bc 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java @@ -21,6 +21,7 @@ package org.apache.tajo.storage; import com.google.protobuf.Message; import org.apache.commons.codec.binary.Base64; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.*; import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; @@ -39,39 +40,48 @@ public class TextSerializerDeserializer implements SerializerDeserializer { public static final byte[] falseBytes = "false".getBytes(); private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + public TextSerializerDeserializer() {} + + public TextSerializerDeserializer(Schema schema) { + init(schema); + } + + private Schema schema; + @Override - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException { + public void init(Schema schema) { + this.schema = schema; + } - byte[] bytes; - int length = 0; - TajoDataTypes.DataType dataType = col.getDataType(); - - if (datum == null || datum instanceof NullDatum) { - switch (dataType.getType()) { - case CHAR: - case TEXT: - length = nullCharacters.length; - out.write(nullCharacters); - break; - default: - break; + @Override + public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters) + throws IOException { + + Column col = schema.getColumn(index); + TajoDataTypes.Type type = col.getDataType().getType(); + if (tuple.isBlankOrNull(index)) { + if (type == TajoDataTypes.Type.CHAR || type == TajoDataTypes.Type.TEXT) { + out.write(nullCharacters); + return nullCharacters.length; } - return length; + return 0; } - switch (dataType.getType()) { + byte[] bytes; + int length = 0; + switch (type) { case BOOLEAN: - out.write(datum.asBool() ? trueBytes : falseBytes); + out.write(tuple.getBool(index) ? trueBytes : falseBytes); length = trueBytes.length; break; case CHAR: - int size = dataType.getLength() - datum.size(); + int size = col.getDataType().getLength() - tuple.size(index); if (size < 0){ - throw new ValueTooLongForTypeCharactersException(dataType.getLength()); + throw new ValueTooLongForTypeCharactersException(col.getDataType().getLength()); } byte[] pad = new byte[size]; - bytes = datum.asTextBytes(); + bytes = tuple.getTextBytes(index); out.write(bytes); out.write(pad); length = bytes.length + pad.length; @@ -86,28 +96,28 @@ public class TextSerializerDeserializer implements SerializerDeserializer { case INET4: case DATE: case INTERVAL: - bytes = datum.asTextBytes(); + bytes = tuple.getTextBytes(index); length = bytes.length; out.write(bytes); break; case TIME: - bytes = ((TimeDatum)datum).asChars(TimeZone.getDefault(), true).getBytes(); + bytes = TimeDatum.asChars(tuple.getTimeDate(index), TimeZone.getDefault(), true).getBytes(); length = bytes.length; out.write(bytes); break; case TIMESTAMP: - bytes = ((TimestampDatum)datum).asChars(TimeZone.getDefault(), true).getBytes(); + bytes = TimestampDatum.asChars(tuple.getTimeDate(index), TimeZone.getDefault(), true).getBytes(); length = bytes.length; out.write(bytes); break; case INET6: case BLOB: - bytes = Base64.encodeBase64(datum.asByteArray(), false); + bytes = Base64.encodeBase64(tuple.getBytes(index), false); length = bytes.length; out.write(bytes, 0, length); break; case PROTOBUF: - ProtobufDatum protobuf = (ProtobufDatum) datum; + ProtobufDatum protobuf = (ProtobufDatum) tuple.getProtobufDatum(index); byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); length = protoBytes.length; out.write(protoBytes, 0, protoBytes.length); @@ -120,10 +130,13 @@ public class TextSerializerDeserializer implements SerializerDeserializer { } @Override - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + + Column col = schema.getColumn(index); + TajoDataTypes.Type type = col.getDataType().getType(); Datum datum; - switch (col.getDataType().getType()) { + switch (type) { case BOOLEAN: datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T'); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java index 043409a..c42cdd6 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java @@ -22,8 +22,6 @@ import com.google.common.base.Objects; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; -import java.util.Comparator; - /** * It represents a pair of start and end tuples. */ http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java index 33f9f1c..1f43ef8 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java @@ -21,6 +21,7 @@ package org.apache.tajo.tuple.offheap; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.*; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; @@ -29,6 +30,7 @@ import org.apache.tajo.util.SizeOf; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.UnsafeUtil; +import org.apache.tajo.util.datetime.TimeMeta; import sun.misc.Unsafe; import java.nio.ByteBuffer; @@ -53,6 +55,16 @@ public class HeapTuple implements Tuple { return data.length; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return types[fieldId].getType(); + } + + @Override + public int size(int fieldId) { + return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); + } + public ByteBuffer nioBuffer() { return ByteBuffer.wrap(data); } @@ -75,13 +87,18 @@ public class HeapTuple implements Tuple { } @Override - public boolean isNull(int fieldid) { + public boolean isBlank(int fieldid) { return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; } @Override - public boolean isNotNull(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + public boolean isBlankOrNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); } @Override @@ -95,23 +112,13 @@ public class HeapTuple implements Tuple { } @Override - public void put(int fieldId, Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); - } - - @Override public void put(Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + throw new UnsupportedException("UnSafeTuple does not support put(Datum[])."); } @Override - public Datum get(int fieldId) { - if (isNull(fieldId)) { + public Datum asDatum(int fieldId) { + if (isBlankOrNull(fieldId)) { return NullDatum.get(); } @@ -184,6 +191,11 @@ public class HeapTuple implements Tuple { } @Override + public byte[] getTextBytes(int fieldId) { + return getText(fieldId).getBytes(); + } + + @Override public short getInt2(int fieldId) { return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); } @@ -213,6 +225,11 @@ public class HeapTuple implements Tuple { return new String(getBytes(fieldId)); } + @Override + public TimeMeta getTimeDate(int fieldId) { + return asDatum(fieldId).asTimeMeta(); + } + public IntervalDatum getInterval(int fieldId) { long pos = checkNullAndGetOffset(fieldId); int months = UNSAFE.getInt(data, BASE_OFFSET + pos); @@ -257,7 +274,7 @@ public class HeapTuple implements Tuple { Datum [] datums = new Datum[size()]; for (int i = 0; i < size(); i++) { if (contains(i)) { - datums[i] = get(i); + datums[i] = asDatum(i); } else { datums[i] = NullDatum.get(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java index b742e6d..e7bd2aa 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.*; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; @@ -30,6 +31,7 @@ import org.apache.tajo.util.SizeOf; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.UnsafeUtil; +import org.apache.tajo.util.datetime.TimeMeta; import sun.misc.Unsafe; import sun.nio.ch.DirectBuffer; @@ -63,6 +65,16 @@ public abstract class UnSafeTuple implements Tuple { return types.length; } + @Override + public TajoDataTypes.Type type(int fieldId) { + return types[fieldId].getType(); + } + + @Override + public int size(int fieldId) { + return UNSAFE.getInt(getFieldAddr(fieldId)); + } + public ByteBuffer nioBuffer() { return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); } @@ -105,13 +117,13 @@ public abstract class UnSafeTuple implements Tuple { } @Override - public boolean isNull(int fieldid) { + public boolean isBlank(int fieldid) { return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; } @Override - public boolean isNotNull(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; + public boolean isBlankOrNull(int fieldid) { + return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; } @Override @@ -125,23 +137,18 @@ public abstract class UnSafeTuple implements Tuple { } @Override - public void put(int fieldId, Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); - } - - @Override public void put(int fieldId, Tuple tuple) { throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); } @Override public void put(Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); + throw new UnsupportedException("UnSafeTuple does not support put(Datum[])."); } @Override - public Datum get(int fieldId) { - if (isNull(fieldId)) { + public Datum asDatum(int fieldId) { + if (isBlankOrNull(fieldId)) { return NullDatum.get(); } @@ -214,6 +221,17 @@ public abstract class UnSafeTuple implements Tuple { } @Override + public byte[] getTextBytes(int fieldId) { + long pos = getFieldAddr(fieldId); + int len = UNSAFE.getInt(pos); + pos += SizeOf.SIZE_OF_INT; + + byte[] bytes = new byte[len]; + UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); + return bytes; + } + + @Override public short getInt2(int fieldId) { long addr = getFieldAddr(fieldId); return UNSAFE.getShort(addr); @@ -241,13 +259,7 @@ public abstract class UnSafeTuple implements Tuple { @Override public String getText(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return new String(bytes); + return new String(getTextBytes(fieldId)); } public IntervalDatum getInterval(int fieldId) { @@ -285,6 +297,11 @@ public abstract class UnSafeTuple implements Tuple { } @Override + public TimeMeta getTimeDate(int fieldId) { + return null; + } + + @Override public Tuple clone() throws CloneNotSupportedException { return toHeapTuple(); } @@ -294,7 +311,7 @@ public abstract class UnSafeTuple implements Tuple { Datum [] datums = new Datum[size()]; for (int i = 0; i < size(); i++) { if (contains(i)) { - datums[i] = get(i); + datums[i] = asDatum(i); } else { datums[i] = NullDatum.get(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java index 0251dc7..d0ee8e0 100644 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java @@ -33,8 +33,7 @@ public class TestFrameTuple { @Before public void setUp() throws Exception { - tuple1 = new VTuple(11); - tuple1.put(new Datum[] { + tuple1 = new VTuple(new Datum[] { DatumFactory.createBool(true), DatumFactory.createBit((byte) 0x99), DatumFactory.createChar('9'), @@ -48,8 +47,7 @@ public class TestFrameTuple { DatumFactory.createInet4("192.168.0.1") }); - tuple2 = new VTuple(11); - tuple2.put(new Datum[] { + tuple2 = new VTuple(new Datum[] { DatumFactory.createBool(true), DatumFactory.createBit((byte) 0x99), DatumFactory.createChar('9'), @@ -76,9 +74,9 @@ public class TestFrameTuple { assertTrue(frame.contains(i)); } - assertEquals(DatumFactory.createInt8(23l), frame.get(5)); - assertEquals(DatumFactory.createInt8(23l), frame.get(16)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21)); + assertEquals(23l, frame.getInt8(5)); + assertEquals(23l, frame.getInt8(16)); + assertEquals("192.168.0.1", frame.getText(10)); + assertEquals("192.168.0.1", frame.getText(21)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java index 9e7f334..96f90e7 100644 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java @@ -71,6 +71,7 @@ public class TestLazyTuple { sb.append(NullDatum.get()); textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|', 13); serde = new TextSerializerDeserializer(); + serde.init(schema); } @Test @@ -194,31 +195,6 @@ public class TestLazyTuple { } @Test - public void testPutTuple() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(2, DatumFactory.createInt4(3)); - - - Schema schema2 = new Schema(); - schema2.addColumn("col1", TajoDataTypes.Type.INT8); - schema2.addColumn("col2", TajoDataTypes.Type.INT8); - - LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1); - t2.put(0, DatumFactory.createInt4(4)); - t2.put(1, DatumFactory.createInt4(5)); - - t1.put(3, t2); - - for (int i = 0; i < 5; i++) { - assertEquals(i + 1, t1.get(i).asInt4()); - } - } - - @Test public void testInvalidNumber() { byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|', 5); Schema schema = new Schema(); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java index 5e94531..9eec96f 100644 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java @@ -51,17 +51,14 @@ public class TestTupleComparator { schema.addColumn("col4", Type.INT4); schema.addColumn("col5", Type.TEXT); - Tuple tuple1 = new VTuple(5); - Tuple tuple2 = new VTuple(5); - - tuple1.put( + VTuple tuple1 = new VTuple( new Datum[] { DatumFactory.createInt4(9), DatumFactory.createInt4(3), DatumFactory.createInt4(33), DatumFactory.createInt4(4), DatumFactory.createText("abc")}); - tuple2.put( + VTuple tuple2 = new VTuple( new Datum[] { DatumFactory.createInt4(1), DatumFactory.createInt4(25), http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java index 1bbd9ec..4ef595c 100644 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java @@ -121,34 +121,15 @@ public class TestVTuple { } @Test - public void testPutTuple() { - Tuple t1 = new VTuple(5); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(2, DatumFactory.createInt4(3)); - - Tuple t2 = new VTuple(2); - t2.put(0, DatumFactory.createInt4(4)); - t2.put(1, DatumFactory.createInt4(5)); - - t1.put(3, t2); - - for (int i = 0; i < 5; i++) { - assertEquals(i+1, t1.get(i).asInt4()); - } - } - - @Test public void testClone() throws CloneNotSupportedException { - Tuple t1 = new VTuple(5); + VTuple t1 = new VTuple(5); t1.put(0, DatumFactory.createInt4(1)); t1.put(1, DatumFactory.createInt4(2)); t1.put(3, DatumFactory.createInt4(2)); t1.put(4, DatumFactory.createText("str")); - VTuple t2 = (VTuple) t1.clone(); + VTuple t2 = t1.clone(); assertNotSame(t1, t2); assertEquals(t1, t2); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java index c43ba38..278d733 100644 --- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java @@ -497,79 +497,79 @@ public class TestOffHeapRowBlock { public static void validateNullity(int j, Tuple tuple) { if (j == 0) { - tuple.isNull(0); + tuple.isBlankOrNull(0); } else { assertTrue((j % 1 == 0) == tuple.getBool(0)); } if (j % 1 == 0) { - tuple.isNull(1); + tuple.isBlankOrNull(1); } else { assertTrue(1 == tuple.getInt2(1)); } if (j % 2 == 0) { - tuple.isNull(2); + tuple.isBlankOrNull(2); } else { assertEquals(j, tuple.getInt4(2)); } if (j % 3 == 0) { - tuple.isNull(3); + tuple.isBlankOrNull(3); } else { assertEquals(j, tuple.getInt8(3)); } if (j % 4 == 0) { - tuple.isNull(4); + tuple.isBlankOrNull(4); } else { assertTrue(j == tuple.getFloat4(4)); } if (j % 5 == 0) { - tuple.isNull(5); + tuple.isBlankOrNull(5); } else { assertTrue(j == tuple.getFloat8(5)); } if (j % 6 == 0) { - tuple.isNull(6); + tuple.isBlankOrNull(6); } else { assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); } if (j % 7 == 0) { - tuple.isNull(7); + tuple.isBlankOrNull(7); } else { assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); } if (j % 8 == 0) { - tuple.isNull(8); + tuple.isBlankOrNull(8); } else { assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); } if (j % 9 == 0) { - tuple.isNull(9); + tuple.isBlankOrNull(9); } else { assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); } if (j % 10 == 0) { - tuple.isNull(10); + tuple.isBlankOrNull(10); } else { assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); } if (j % 11 == 0) { - tuple.isNull(11); + tuple.isBlankOrNull(11); } else { assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); } if (j % 12 == 0) { - tuple.isNull(12); + tuple.isBlankOrNull(12); } else { assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java index 0e3441b..425f392 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java @@ -151,11 +151,10 @@ public abstract class AbstractHBaseAppender implements Appender { if (rowkeyColumnIndexes.length > 1) { bout.reset(); for (int i = 0; i < rowkeyColumnIndexes.length; i++) { - datum = tuple.get(rowkeyColumnIndexes[i]); if (isBinaryColumns[rowkeyColumnIndexes[i]]) { - rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), tuple, i); } else { - rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), tuple, i); } bout.write(rowkey); if (i < rowkeyColumnIndexes.length - 1) { @@ -165,11 +164,10 @@ public abstract class AbstractHBaseAppender implements Appender { rowkey = bout.toByteArray(); } else { int index = rowkeyColumnIndexes[0]; - datum = tuple.get(index); if (isBinaryColumns[index]) { - rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum); + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), tuple, index); } else { - rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum); + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), tuple, index); } } @@ -182,12 +180,11 @@ public abstract class AbstractHBaseAppender implements Appender { if (isRowKeyMappings[i]) { continue; } - Datum datum = tuple.get(i); byte[] value; if (isBinaryColumns[i]) { - value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); + value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), tuple, i); } else { - value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); + value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), tuple, i); } if (isColumnKeys[i]) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java index 53ff9dc..40c4aea 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java @@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.Bytes; import java.io.IOException; @@ -98,4 +99,38 @@ public class HBaseBinarySerializerDeserializer { return bytes; } + + public static byte[] serialize(Column col, Tuple tuple, int index) throws IOException { + if (tuple.isBlankOrNull(index)) { + return null; + } + + byte[] bytes; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + bytes = Bytes.toBytes(tuple.getInt2(index)); + break; + case INT4: + bytes = Bytes.toBytes(tuple.getInt4(index)); + break; + case INT8: + bytes = Bytes.toBytes(tuple.getInt8(index)); + break; + case FLOAT4: + bytes = Bytes.toBytes(tuple.getFloat4(index)); + break; + case FLOAT8: + bytes = Bytes.toBytes(tuple.getFloat8(index)); + break; + case TEXT: + bytes = Bytes.toBytes(tuple.getText(index)); + break; + default: + bytes = null; + break; + } + + return bytes; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java index b1a2d59..19fdf80 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -27,7 +27,6 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.storage.Tuple; @@ -65,12 +64,11 @@ public class HBasePutAppender extends AbstractHBaseAppender { if (isRowKeyMappings[i]) { continue; } - Datum datum = tuple.get(i); byte[] value; if (isBinaryColumns[i]) { - value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); + value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), tuple, i); } else { - value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); + value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), tuple, i); } if (isColumnKeys[i]) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 670f87e..845c2d7 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -1008,7 +1008,7 @@ public class HBaseTablespace extends Tablespace { Tuple previousTuple = dataRange.getStart(); for (byte[] eachEndKey : endKeys) { - Tuple endTuple = new VTuple(sortSpecs.length); + VTuple endTuple = new VTuple(sortSpecs.length); byte[][] rowKeyFields; if (sortSpecs.length > 1) { byte[][] splitValues = BytesUtils.splitPreserveAllTokens( http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java index a0ad492..ea5d0b0 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java @@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.NumberUtil; import java.io.IOException; @@ -68,4 +69,12 @@ public class HBaseTextSerializerDeserializer { return datum.asChars().getBytes(); } + + public static byte[] serialize(Column col, Tuple tuple, int index) throws IOException { + if (tuple.isBlankOrNull(index)) { + return null; + } + + return tuple.getBytes(index); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java index 5fc96f1..ee3095c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -33,7 +33,6 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.compress.CodecPool; @@ -149,6 +148,7 @@ public class CSVFile { String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde.init(schema); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -163,12 +163,10 @@ public class CSVFile { @Override public void addTuple(Tuple tuple) throws IOException { - Datum datum; int rowBytes = 0; for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars); + rowBytes += serde.serialize(i, tuple, os, nullChars); if(columnNum - 1 > i){ os.write(delimiter); @@ -176,7 +174,7 @@ public class CSVFile { } if (isShuffle) { // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); + stats.analyzeField(i, tuple); } } os.write(LF); @@ -358,6 +356,7 @@ public class CSVFile { String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde.init(schema); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java index 0b3755d..cfd5a79 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage; import io.netty.buffer.ByteBuf; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.datum.Datum; import org.apache.tajo.storage.text.TextLineParsingError; @@ -29,9 +30,11 @@ import java.io.OutputStream; public interface FieldSerializerDeserializer { - public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; + void init(Schema schema); - public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) + int serialize(int columnIndex, Tuple datum, OutputStream out, byte[] nullChars) throws IOException; + + Datum deserialize(int columnIndex, ByteBuf buf, ByteBuf nullChars) throws IOException, TextLineParsingError; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 5213ba0..4e9bcda 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -634,10 +634,10 @@ public class RawFile { nullFlags.clear(); for (int i = 0; i < schema.size(); i++) { if (enabledStats) { - stats.analyzeField(i, t.get(i)); + stats.analyzeField(i, t); } - if (t.isNull(i)) { + if (t.isBlankOrNull(i)) { nullFlags.set(i); continue; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java index 1ff6c4f..0e628d4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java @@ -376,56 +376,56 @@ public class RowFile { for (int i = 0; i < schema.size(); i++) { if (enabledStats) { - stats.analyzeField(i, t.get(i)); + stats.analyzeField(i, t); } - if (t.isNull(i)) { + if (t.isBlankOrNull(i)) { nullFlags.set(i); } else { col = schema.getColumn(i); switch (col.getDataType().getType()) { case BOOLEAN: - buffer.put(t.get(i).asByte()); + buffer.put(t.getByte(i)); break; case BIT: - buffer.put(t.get(i).asByte()); + buffer.put(t.getByte(i)); break; case CHAR: - byte[] src = t.get(i).asByteArray(); + byte[] src = t.getBytes(i); byte[] dst = Arrays.copyOf(src, col.getDataType().getLength()); buffer.putInt(src.length); buffer.put(dst); break; case TEXT: - byte [] strbytes = t.get(i).asByteArray(); + byte [] strbytes = t.getBytes(i); buffer.putShort((short)strbytes.length); buffer.put(strbytes, 0, strbytes.length); break; case INT2: - buffer.putShort(t.get(i).asInt2()); + buffer.putShort(t.getInt2(i)); break; case INT4: - buffer.putInt(t.get(i).asInt4()); + buffer.putInt(t.getInt4(i)); break; case INT8: - buffer.putLong(t.get(i).asInt8()); + buffer.putLong(t.getInt8(i)); break; case FLOAT4: - buffer.putFloat(t.get(i).asFloat4()); + buffer.putFloat(t.getFloat4(i)); break; case FLOAT8: - buffer.putDouble(t.get(i).asFloat8()); + buffer.putDouble(t.getFloat8(i)); break; case BLOB: - byte [] bytes = t.get(i).asByteArray(); + byte [] bytes = t.getBytes(i); buffer.putShort((short)bytes.length); buffer.put(bytes); break; case INET4: - buffer.put(t.get(i).asByteArray()); + buffer.put(t.getBytes(i)); break; case INET6: - buffer.put(t.get(i).asByteArray()); + buffer.put(t.getBytes(i)); break; case NULL_TYPE: nullFlags.set(i); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index da426ea..2782955 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -33,7 +33,6 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.FileAppender; import org.apache.tajo.storage.TableStatistics; import org.apache.tajo.storage.Tuple; @@ -104,7 +103,7 @@ public class AvroAppender extends FileAppender { } private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { - if (tuple.get(i) instanceof NullDatum) { + if (tuple.isBlankOrNull(i)) { return null; } switch (avroType) { @@ -141,7 +140,7 @@ public class AvroAppender extends FileAppender { for (int i = 0; i < schema.size(); ++i) { Column column = schema.getColumn(i); if (enabledStats) { - stats.analyzeField(i, tuple.get(i)); + stats.analyzeField(i, tuple); } Object value; Schema.Field avroField = avroFields.get(i); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index 34e9661..60c32a7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -56,7 +56,7 @@ public class JsonLineSerializer extends TextLineSerializer { JSONObject jsonObject = new JSONObject(); for (int i = 0; i < columnNum; i++) { - if (input.isNull(i)) { + if (input.isBlankOrNull(i)) { continue; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index 415c338..45960aa 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -108,7 +108,7 @@ public class ParquetAppender extends FileAppender { public void addTuple(Tuple tuple) throws IOException { if (enabledStats) { for (int i = 0; i < schema.size(); ++i) { - stats.analyzeField(i, tuple.get(i)); + stats.analyzeField(i, tuple); } } writer.write(tuple); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java index 4c675a4..7f236b6 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java @@ -158,7 +158,7 @@ public class TajoRecordConverter extends GroupConverter { final int projectionIndex = projectionMap[i]; Column column = tajoReadSchema.getColumn(projectionIndex); if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE - || currentTuple.get(i) == null) { + || currentTuple.isBlankOrNull(i)) { set(projectionIndex, NullDatum.get()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java index dd951e1..de2a1e3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.Datum; import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; import org.apache.tajo.storage.Tuple; import parquet.hadoop.api.WriteSupport; @@ -99,11 +98,10 @@ public class TajoWriteSupport extends WriteSupport<Tuple> { if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { continue; } - Datum datum = tuple.get(tajoIndex); Type fieldType = fields.get(index); - if (!tuple.isNull(tajoIndex)) { + if (!tuple.isBlankOrNull(tajoIndex)) { recordConsumer.startField(fieldType.getName(), index); - writeValue(fieldType, column, datum); + writeValue(fieldType, column, tuple, tajoIndex); recordConsumer.endField(fieldType.getName(), index); } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) { throw new RuntimeException("Null-value for required field: " + @@ -113,40 +111,40 @@ public class TajoWriteSupport extends WriteSupport<Tuple> { } } - private void writeValue(Type fieldType, Column column, Datum datum) { + private void writeValue(Type fieldType, Column column, Tuple tuple, int index) { switch (column.getDataType().getType()) { case BOOLEAN: - recordConsumer.addBoolean(datum.asBool()); + recordConsumer.addBoolean(tuple.getBool(index)); break; case BIT: case INT2: case INT4: - recordConsumer.addInteger(datum.asInt4()); + recordConsumer.addInteger(tuple.getInt4(index)); break; case INT8: - recordConsumer.addLong(datum.asInt8()); + recordConsumer.addLong(tuple.getInt8(index)); break; case FLOAT4: - recordConsumer.addFloat(datum.asFloat4()); + recordConsumer.addFloat(tuple.getFloat4(index)); break; case FLOAT8: - recordConsumer.addDouble(datum.asFloat8()); + recordConsumer.addDouble(tuple.getFloat8(index)); break; case CHAR: - if (datum.size() > column.getDataType().getLength()) { + if (tuple.size(index) > column.getDataType().getLength()) { throw new ValueTooLongForTypeCharactersException(column.getDataType().getLength()); } - recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes())); + recordConsumer.addBinary(Binary.fromByteArray(tuple.getTextBytes(index))); break; case TEXT: - recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes())); + recordConsumer.addBinary(Binary.fromByteArray(tuple.getBytes(index))); break; case PROTOBUF: case BLOB: case INET4: case INET6: - recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray())); + recordConsumer.addBinary(Binary.fromByteArray(tuple.getBytes(index))); break; default: break; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index af260b4..1dcec5f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -30,7 +30,6 @@ import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -646,8 +645,12 @@ public class RCFile { valLenBuffer = new NonSyncByteArrayOutputStream(); } - public int append(Column column, Datum datum) throws IOException { - int currentLen = serde.serialize(column, datum, columnValBuffer, nullChars); + public int append(NullDatum nill) { + return nullChars.length; + } + + public int append(Tuple tuple, int i) throws IOException { + int currentLen = serde.serialize(i, tuple, columnValBuffer, nullChars); columnValueLength += currentLen; uncompressedColumnValueLength += currentLen; @@ -765,6 +768,7 @@ public class RCFile { BinarySerializerDeserializer.class.getName()); try { serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde.init(schema); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -892,20 +896,19 @@ public class RCFile { int size = schema.size(); for (int i = 0; i < size; i++) { - Datum datum = tuple.get(i); - int length = columnBuffers[i].append(schema.getColumn(i), datum); + int length = columnBuffers[i].append(tuple, i); columnBufferSize += length; if (isShuffle) { // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); + stats.analyzeField(i, tuple); } } if (size < columnNumber) { for (int i = size; i < columnNumber; i++) { - columnBuffers[i].append(schema.getColumn(i), NullDatum.get()); + columnBuffers[i].append(NullDatum.get()); if (isShuffle) { - stats.analyzeField(i, NullDatum.get()); + stats.analyzeNull(i); } } } @@ -1377,6 +1380,7 @@ public class RCFile { serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); } serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde.init(schema); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -1712,7 +1716,7 @@ public class RCFile { } else { colAdvanceRow(j, col); - Datum datum = serde.deserialize(schema.getColumn(actualColumnIdx), + Datum datum = serde.deserialize(actualColumnIdx, currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars); tuple.put(j, datum); col.rowReadIndex += col.prvLength; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index 404352c..9b09d78 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -35,7 +35,6 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.storage.*; @@ -125,6 +124,7 @@ public class SequenceFileAppender extends FileAppender { String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde.init(schema); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -159,16 +159,13 @@ public class SequenceFileAppender extends FileAppender { @Override public void addTuple(Tuple tuple) throws IOException { - Datum datum; if (serde instanceof BinarySerializerDeserializer) { byte nullByte = 0; int lasti = 0; for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - // set bit to 1 if a field is not null - if (null != datum) { + if (!tuple.isBlank(i)) { nullByte |= 1 << (i % 8); } @@ -179,29 +176,28 @@ public class SequenceFileAppender extends FileAppender { os.write(nullByte); for (int j = lasti; j <= i; j++) { - datum = tuple.get(j); switch (schema.getColumn(j).getDataType().getType()) { case TEXT: - BytesUtils.writeVLong(os, datum.asTextBytes().length); + BytesUtils.writeVLong(os, tuple.getTextBytes(j).length); break; case PROTOBUF: - ProtobufDatum protobufDatum = (ProtobufDatum) datum; + ProtobufDatum protobufDatum = (ProtobufDatum) tuple.getProtobufDatum(j); BytesUtils.writeVLong(os, protobufDatum.asByteArray().length); break; case CHAR: case INET4: case BLOB: - BytesUtils.writeVLong(os, datum.asByteArray().length); + BytesUtils.writeVLong(os, tuple.getBytes(j).length); break; default: } - serde.serialize(schema.getColumn(j), datum, os, nullChars); + serde.serialize(j, tuple, os, nullChars); if (isShuffle) { // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(j, datum); + stats.analyzeField(j, tuple); } } lasti = i + 1; @@ -215,8 +211,7 @@ public class SequenceFileAppender extends FileAppender { } else { for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - serde.serialize(schema.getColumn(i), datum, os, nullChars); + serde.serialize(i, tuple, os, nullChars); if (columnNum -1 > i) { os.write((byte) delimiter); @@ -224,7 +219,7 @@ public class SequenceFileAppender extends FileAppender { if (isShuffle) { // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); + stats.analyzeField(i, tuple); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index af0973e..ff73a1c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -117,6 +117,7 @@ public class SequenceFileScanner extends FileScanner { try { String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde.init(schema); if (serde instanceof BinarySerializerDeserializer) { hasBinarySerDe = true; @@ -225,7 +226,7 @@ public class SequenceFileScanner extends FileScanner { for (int j = 0; j < projectionMap.length; j++) { if (projectionMap[j] == i) { - Datum datum = serde.deserialize(schema.getColumn(i), bytes, fieldStart[i], fieldLength[i], nullChars); + Datum datum = serde.deserialize(i, bytes, fieldStart[i], fieldLength[i], nullChars); tuple.put(i, datum); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index 0901c0b..eabab22 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -19,8 +19,8 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBuf; -import org.apache.tajo.catalog.Column; import io.netty.buffer.ByteBufProcessor; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.Datum; @@ -38,12 +38,10 @@ public class CSVLineDeserializer extends TextLineDeserializer { private int delimiterCompensation; private int [] targetColumnIndexes; - private Column [] projected; public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { super(schema, meta); - this.projected = projected; targetColumnIndexes = new int[projected.length]; for (int i = 0; i < projected.length; i++) { targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName()); @@ -67,13 +65,14 @@ public class CSVLineDeserializer extends TextLineDeserializer { nullChars = TextLineSerDe.getNullChars(meta); fieldSerDer = new TextFieldSerializerDeserializer(meta); + fieldSerDer.init(schema); } public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError { - int[] projection = targetColumnIndexes; if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { return; } + int[] projection = targetColumnIndexes; final int rowLength = lineBuf.readableBytes(); int start = 0, fieldLength = 0, end = 0; @@ -93,14 +92,12 @@ public class CSVLineDeserializer extends TextLineDeserializer { if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { lineBuf.setIndex(start, start + fieldLength); - try { - Datum datum = fieldSerDer.deserialize(lineBuf, projected[currentTarget], currentIndex, nullChars); + Datum datum = fieldSerDer.deserialize(currentIndex, lineBuf, nullChars); output.put(currentTarget, datum); } catch (Exception e) { output.put(currentTarget, NullDatum.get()); } - currentTarget++; }
