Repository: tajo Updated Branches: refs/heads/master 9fcc9fd3a -> 682635852
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java index 833d102..e0ad3d7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java @@ -19,7 +19,6 @@ package org.apache.tajo.storage.thirdparty.orc; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; @@ -30,21 +29,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.tajo.datum.*; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.orc.CompressionCodec.Modifier; -import org.apache.tajo.storage.thirdparty.orc.OrcProto.RowIndexEntry; -import org.apache.tajo.storage.thirdparty.orc.OrcProto.StripeStatistics; -import org.apache.tajo.storage.thirdparty.orc.OrcProto.Type; -import org.apache.tajo.storage.thirdparty.orc.OrcProto.UserMetadataItem; import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.hive.serde2.objectinspector.*; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.io.Text; +import org.apache.orc.*; +import org.apache.orc.CompressionCodec.Modifier; +import org.apache.orc.OrcProto.RowIndexEntry; +import org.apache.orc.OrcUtils; +import org.apache.orc.impl.*; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.Inet4Datum; +import org.apache.tajo.datum.Int4Datum; +import org.apache.tajo.datum.Int8Datum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.thirdparty.orc.OrcFile.*; +import org.apache.tajo.util.datetime.DateTimeConstants; import org.apache.tajo.util.datetime.DateTimeUtil; import java.io.IOException; @@ -94,10 +92,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final boolean addBlockPadding; private final int bufferSize; private final long blockSize; - private final float paddingTolerance; + private final double paddingTolerance; + private final TypeDescription schema; + // the streams that make up the current stripe - private final Map<StreamName, BufferedStream> streams = - new TreeMap<>(); + private final Map<StreamName, BufferedStream> streams = new TreeMap<>(); private FSDataOutputStream rawWriter = null; // the compressed metadata information outStream @@ -111,47 +110,32 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private long rawDataSize = 0; private int rowsInIndex = 0; private int stripesAtLastFlush = -1; - private final List<OrcProto.StripeInformation> stripes = - new ArrayList<>(); - private final Map<String, ByteString> userMetadata = - new TreeMap<>(); + private final List<OrcProto.StripeInformation> stripes = new ArrayList<>(); + private final Map<String, ByteString> userMetadata = new TreeMap<>(); + private final StreamFactory streamFactory = new StreamFactory(); private final TreeWriter treeWriter; private final boolean buildIndex; private final MemoryManager memoryManager; - private final OrcFile.Version version; + private final Version version; private final Configuration conf; - private final OrcFile.WriterCallback callback; - private final OrcFile.WriterContext callbackContext; - private final OrcFile.EncodingStrategy encodingStrategy; - private final OrcFile.CompressionStrategy compressionStrategy; + private final WriterCallback callback; + private final WriterContext callbackContext; + private final EncodingStrategy encodingStrategy; + private final CompressionStrategy compressionStrategy; private final boolean[] bloomFilterColumns; private final double bloomFilterFpp; private boolean writeTimeZone; private TimeZone timeZone; - WriterImpl(FileSystem fs, - Path path, - Configuration conf, - ObjectInspector inspector, - long stripeSize, - CompressionKind compress, - int bufferSize, - int rowIndexStride, - MemoryManager memoryManager, - boolean addBlockPadding, - OrcFile.Version version, - OrcFile.WriterCallback callback, - OrcFile.EncodingStrategy encodingStrategy, - OrcFile.CompressionStrategy compressionStrategy, - float paddingTolerance, - long blockSizeValue, - String bloomFilterColumnNames, - double bloomFilterFpp, - TimeZone timeZone) throws IOException { + public WriterImpl(FileSystem fs, + Path path, + OrcFile.WriterOptions opts, + TimeZone timeZone) throws IOException { this.fs = fs; this.path = path; - this.conf = conf; - this.callback = callback; + this.conf = opts.getConfiguration(); + this.callback = opts.getCallback(); + this.schema = opts.getSchema(); if (callback != null) { callbackContext = new OrcFile.WriterContext(){ @@ -163,100 +147,60 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } else { callbackContext = null; } - this.adjustedStripeSize = stripeSize; - this.defaultStripeSize = stripeSize; - this.version = version; - this.encodingStrategy = encodingStrategy; - this.compressionStrategy = compressionStrategy; - this.addBlockPadding = addBlockPadding; - this.blockSize = blockSizeValue; - this.paddingTolerance = paddingTolerance; - this.compress = compress; - this.rowIndexStride = rowIndexStride; - this.memoryManager = memoryManager; - this.timeZone = timeZone; + this.adjustedStripeSize = opts.getStripeSize(); + this.defaultStripeSize = opts.getStripeSize(); + this.version = opts.getVersion(); + this.encodingStrategy = opts.getEncodingStrategy(); + this.compressionStrategy = opts.getCompressionStrategy(); + this.addBlockPadding = opts.getBlockPadding(); + this.blockSize = opts.getBlockSize(); + this.paddingTolerance = opts.getPaddingTolerance(); + this.compress = opts.getCompress(); + this.rowIndexStride = opts.getRowIndexStride(); + this.memoryManager = opts.getMemoryManager(); buildIndex = rowIndexStride > 0; codec = createCodec(compress); - String allColumns = conf.get(IOConstants.COLUMNS); - if (allColumns == null) { - allColumns = getColumnNamesFromInspector(inspector); - } - this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize); + int numColumns = schema.getMaximumId() + 1; + this.bufferSize = getEstimatedBufferSize(defaultStripeSize, + numColumns, opts.getBufferSize()); if (version == OrcFile.Version.V_0_11) { /* do not write bloom filters for ORC v11 */ - this.bloomFilterColumns = - OrcUtils.includeColumns(null, allColumns, inspector); + this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1]; } else { this.bloomFilterColumns = - OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector); + OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema); } - this.bloomFilterFpp = bloomFilterFpp; - treeWriter = createTreeWriter(inspector, new StreamFactory(), false); + this.bloomFilterFpp = opts.getBloomFilterFpp(); + this.timeZone = timeZone; + treeWriter = createTreeWriter(schema, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { throw new IllegalArgumentException("Row stride must be at least " + MIN_ROW_INDEX_STRIDE); } // ensure that we are able to handle callbacks before we register ourselves - memoryManager.addWriter(path, stripeSize, this); - } - - private String getColumnNamesFromInspector(ObjectInspector inspector) { - List<String> fieldNames = Lists.newArrayList(); - Joiner joiner = Joiner.on(","); - if (inspector instanceof StructObjectInspector) { - StructObjectInspector soi = (StructObjectInspector) inspector; - List<? extends StructField> fields = soi.getAllStructFieldRefs(); - for(StructField sf : fields) { - fieldNames.add(sf.getFieldName()); - } - } - return joiner.join(fieldNames); + memoryManager.addWriter(path, opts.getStripeSize(), this); } @VisibleForTesting - int getEstimatedBufferSize(int bs) { - return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs); - } - - int getEstimatedBufferSize(String colNames, int bs) { - long availableMem = getMemoryAvailableForORC(); - if (colNames != null) { - final int numCols = colNames.split(",").length; - if (numCols > COLUMN_COUNT_THRESHOLD) { - // In BufferedStream, there are 3 outstream buffers (compressed, - // uncompressed and overflow) and list of previously compressed buffers. - // Since overflow buffer is rarely used, lets consider only 2 allocation. - // Also, initially, the list of compression buffers will be empty. - final int outStreamBuffers = codec == null ? 1 : 2; - - // max possible streams per column is 5. For string columns, there is - // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams. - final int maxStreams = 5; - - // Lets assume 10% memory for holding dictionary in memory and other - // object allocations - final long miscAllocation = (long) (0.1f * availableMem); - - // compute the available memory - final long remainingMem = availableMem - miscAllocation; - - int estBufferSize = (int) (remainingMem / - (maxStreams * outStreamBuffers * numCols)); - estBufferSize = getClosestBufferSize(estBufferSize, bs); - if (estBufferSize > bs) { - estBufferSize = bs; - } - - LOG.info("WIDE TABLE - Number of columns: " + numCols + - " Chosen compression buffer size: " + estBufferSize); - return estBufferSize; - } + public static int getEstimatedBufferSize(long stripeSize, int numColumns, + int bs) { + // The worst case is that there are 2 big streams per a column and + // we want to guarantee that each stream gets ~10 buffers. + // This keeps buffers small enough that we don't get really small stripe + // sizes. + int estBufferSize = (int) (stripeSize / (20 * numColumns)); + estBufferSize = getClosestBufferSize(estBufferSize); + if (estBufferSize > bs) { + estBufferSize = bs; + } else { + LOG.info("WIDE TABLE - Number of columns: " + numColumns + + " Chosen compression buffer size: " + estBufferSize); } - return bs; + return estBufferSize; } - private int getClosestBufferSize(int estBufferSize, int bs) { + private static int getClosestBufferSize(int estBufferSize) { final int kb4 = 4 * 1024; final int kb8 = 8 * 1024; final int kb16 = 16 * 1024; @@ -616,8 +560,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { */ private abstract static class TreeWriter { protected final int id; - protected final ObjectInspector inspector; - private final BitFieldWriter isPresent; + protected final BitFieldWriter isPresent; private final boolean isCompressed; protected final ColumnStatisticsImpl indexStatistics; protected final ColumnStatisticsImpl stripeColStatistics; @@ -634,24 +577,24 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final OrcProto.BloomFilter.Builder bloomFilterEntry; private boolean foundNulls; private OutStream isPresentOutStream; - private final List<StripeStatistics.Builder> stripeStatsBuilders; + private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders; private final StreamFactory streamFactory; /** * Create a tree writer. * @param columnId the column id of the column to write - * @param inspector the object inspector to use + * @param schema the row schema * @param streamFactory limited access to the Writer's data. * @param nullable can the value be null? * @throws IOException */ - TreeWriter(int columnId, ObjectInspector inspector, + TreeWriter(int columnId, + TypeDescription schema, StreamFactory streamFactory, boolean nullable) throws IOException { this.streamFactory = streamFactory; this.isCompressed = streamFactory.isCompressed(); this.id = columnId; - this.inspector = inspector; if (nullable) { isPresentOutStream = streamFactory.createStream(id, OrcProto.Stream.Kind.PRESENT); @@ -661,9 +604,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } this.foundNulls = false; createBloomFilter = streamFactory.getBloomFilterColumns()[columnId]; - indexStatistics = ColumnStatisticsImpl.create(inspector); - stripeColStatistics = ColumnStatisticsImpl.create(inspector); - fileStatistics = ColumnStatisticsImpl.create(inspector); + indexStatistics = ColumnStatisticsImpl.create(schema); + stripeColStatistics = ColumnStatisticsImpl.create(schema); + fileStatistics = ColumnStatisticsImpl.create(schema); childrenWriters = new TreeWriter[0]; rowIndex = OrcProto.RowIndex.newBuilder(); rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); @@ -912,10 +855,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final BitFieldWriter writer; BooleanTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.writer = new BitFieldWriter(out, 1); @@ -927,7 +870,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { super.write(datum); if (datum != null && datum.isNotNull()) { boolean val = datum.asBool(); - indexStatistics.updateBoolean(val); + indexStatistics.updateBoolean(val, 1); writer.write(val ? 1 : 0); } } @@ -951,10 +894,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final RunLengthByteWriter writer; ByteTreeWriter(int columnId, - ObjectInspector inspector, - StreamFactory writer, - boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); this.writer = new RunLengthByteWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA)); recordPosition(rowIndexPosition); @@ -965,7 +908,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { super.write(datum); if (datum != null && datum.isNotNull()) { byte val = datum.asByte(); - indexStatistics.updateInteger(val); + indexStatistics.updateInteger(val, 1); if (createBloomFilter) { bloomFilter.addLong(val); } @@ -993,10 +936,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private boolean isDirectV2 = true; IntegerTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); OutStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); @@ -1026,7 +969,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } else { val = datum.asInt2(); } - indexStatistics.updateInteger(val); + indexStatistics.updateInteger(val, 1); if (createBloomFilter) { // integers are converted to longs in column statistics and during SARG evaluation bloomFilter.addLong(val); @@ -1055,10 +998,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final SerializationUtils utils; FloatTreeWriter(int columnId, - ObjectInspector inspector, - StreamFactory writer, - boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); @@ -1099,10 +1042,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final SerializationUtils utils; DoubleTreeWriter(int columnId, - ObjectInspector inspector, - StreamFactory writer, - boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); @@ -1137,33 +1080,33 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } - private static class StringTreeWriter extends TreeWriter { + private static abstract class StringBaseTreeWriter extends TreeWriter { private static final int INITIAL_DICTIONARY_SIZE = 4096; private final OutStream stringOutput; private final IntegerWriter lengthOutput; private final IntegerWriter rowOutput; - private final StringRedBlackTree dictionary = + protected final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); - private final DynamicIntArray rows = new DynamicIntArray(); - private final PositionedOutputStream directStreamOutput; - private final IntegerWriter directLengthOutput; - private final List<RowIndexEntry> savedRowIndex = - new ArrayList<>(); + protected final DynamicIntArray rows = new DynamicIntArray(); + protected final PositionedOutputStream directStreamOutput; + protected final IntegerWriter directLengthOutput; + private final List<OrcProto.RowIndexEntry> savedRowIndex = + new ArrayList<OrcProto.RowIndexEntry>(); private final boolean buildIndex; - private final List<Long> rowIndexValueCount = new ArrayList<>(); + private final List<Long> rowIndexValueCount = new ArrayList<Long>(); // If the number of keys in a dictionary is greater than this fraction of //the total number of non-null rows, turn off dictionary encoding - private final float dictionaryKeySizeThreshold; - private boolean useDictionaryEncoding = true; + private final double dictionaryKeySizeThreshold; + protected boolean useDictionaryEncoding = true; private boolean isDirectV2 = true; private boolean doneDictionaryCheck; - private final boolean strideDictionaryCheck; + protected final boolean strideDictionaryCheck; - StringTreeWriter(int columnId, - ObjectInspector inspector, - StreamFactory writer, - boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + StringBaseTreeWriter(int columnId, + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); @@ -1177,33 +1120,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback { directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); directLengthOutput = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - dictionaryKeySizeThreshold = writer.getConfiguration().getFloat( - OrcConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, - OrcConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.defaultFloatVal); - strideDictionaryCheck = writer.getConfiguration().getBoolean( - OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.varname, - OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.defaultBoolVal); + Configuration conf = writer.getConfiguration(); + dictionaryKeySizeThreshold = + org.apache.orc.OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); + strideDictionaryCheck = + org.apache.orc.OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf); doneDictionaryCheck = false; } - @Override - void write(Datum datum) throws IOException { - super.write(datum); - if (datum != null && datum.isNotNull()) { - if (useDictionaryEncoding || !strideDictionaryCheck) { - rows.add(dictionary.add(datum.toString())); - } else { - // write data and length - directStreamOutput.write(datum.asByteArray(), 0, datum.size()); - directLengthOutput.write(datum.size()); - } - indexStatistics.updateString(datum.toString()); - if (createBloomFilter) { - bloomFilter.addBytes(datum.asByteArray(), datum.size()); - } - } - } - private boolean checkDictionaryEncoding() { if (!doneDictionaryCheck) { // Set the flag indicating whether or not to use dictionary encoding @@ -1269,7 +1193,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private int currentId = 0; @Override public void visit(StringRedBlackTree.VisitorContext context - ) throws IOException { + ) throws IOException { context.writeBytes(stringOutput); lengthOutput.write(context.getLength()); dumpOrder[context.getOriginalPosition()] = currentId++; @@ -1383,29 +1307,76 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } + private static class StringTreeWriter extends StringBaseTreeWriter { + StringTreeWriter(int columnId, + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + if (useDictionaryEncoding || !strideDictionaryCheck) { + rows.add(dictionary.add(datum.toString())); + } else { + // write data and length + directStreamOutput.write(datum.asByteArray(), 0, datum.size()); + directLengthOutput.write(datum.size()); + } + byte[] buf = datum.asByteArray(); + indexStatistics.updateString(buf, 0, buf.length, 1); + if (createBloomFilter) { + bloomFilter.addBytes(buf, 0, buf.length); + } + } + } + } + /** * Under the covers, char is written to ORC the same way as string. */ private static class CharTreeWriter extends StringTreeWriter { + private final int itemLength; + private final byte[] padding; CharTreeWriter(int columnId, - ObjectInspector inspector, - StreamFactory writer, - boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + itemLength = schema.getMaxLength(); + padding = new byte[itemLength]; } - } - /** - * Under the covers, varchar is written to ORC the same way as string. - */ - private static class VarcharTreeWriter extends StringTreeWriter { + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + byte[] ptr; + byte[] buf = datum.asByteArray(); + if (buf.length >= itemLength) { + ptr = buf; + } else { + ptr = padding; + System.arraycopy(buf, 0, ptr, 0, buf.length); + Arrays.fill(ptr, buf.length, itemLength, (byte) ' '); + } + if (useDictionaryEncoding || !strideDictionaryCheck) { + rows.add(dictionary.add(ptr, 0, itemLength)); + } else { + // write data and length + directStreamOutput.write(ptr, 0, itemLength); + directLengthOutput.write(itemLength); + } - VarcharTreeWriter(int columnId, - ObjectInspector inspector, - StreamFactory writer, - boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + indexStatistics.updateString(ptr, 0, ptr.length, 1); + if (createBloomFilter) { + bloomFilter.addBytes(ptr, 0, ptr.length); + } + } } } @@ -1415,10 +1386,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private boolean isDirectV2 = true; BinaryTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); @@ -1441,11 +1412,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback { void write(Datum datum) throws IOException { super.write(datum); if (datum != null && datum.isNotNull()) { - stream.write(datum.asByteArray(), 0, datum.size()); + byte[] buf = datum.asByteArray(); + stream.write(buf, 0, buf.length); length.write(datum.size()); - indexStatistics.updateBinary(datum); + indexStatistics.updateBinary(buf, 0, buf.length, 1); if (createBloomFilter) { - bloomFilter.addBytes(datum.asByteArray(), datum.size()); + bloomFilter.addBytes(buf, 0, buf.length); } } } @@ -1467,7 +1439,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } - static final int MILLIS_PER_SECOND = 1000; static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00"; private static class TimestampTreeWriter extends TreeWriter { @@ -1478,10 +1449,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private TimeZone timeZone; TimestampTreeWriter(int columnId, - ObjectInspector inspector, - StreamFactory writer, - boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); this.seconds = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); @@ -1489,7 +1460,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer); recordPosition(rowIndexPosition); // for unit tests to set different time zones - this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND; + this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / DateTimeConstants.MSECS_PER_SEC; writer.useWriterTimeZone(true); timeZone = writer.getTimeZone(); } @@ -1515,7 +1486,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { Timestamp val = new Timestamp(javaTimestamp); indexStatistics.updateTimestamp(val); - seconds.write((val.getTime() / MILLIS_PER_SECOND) - base_timestamp); + seconds.write((val.getTime() / DateTimeConstants.MSECS_PER_SEC) - base_timestamp); nanos.write(formatNanos(val.getNanos())); if (createBloomFilter) { bloomFilter.addLong(val.getTime()); @@ -1561,12 +1532,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final boolean isDirectV2; DateTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); OutStream out = writer.createStream(id, - OrcProto.Stream.Kind.DATA); + OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); recordPosition(rowIndexPosition); @@ -1612,19 +1583,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } private static class StructTreeWriter extends TreeWriter { - private final List<? extends StructField> fields; StructTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); - StructObjectInspector structObjectInspector = - (StructObjectInspector) inspector; - fields = structObjectInspector.getAllStructFieldRefs(); - childrenWriters = new TreeWriter[fields.size()]; + super(columnId, schema, writer, nullable); + List<TypeDescription> children = schema.getChildren(); + childrenWriters = new TreeWriter[children.size()]; for(int i=0; i < childrenWriters.length; ++i) { childrenWriters[i] = createTreeWriter( - fields.get(i).getFieldObjectInspector(), writer, true); + children.get(i), writer, + true); } recordPosition(rowIndexPosition); } @@ -1636,9 +1605,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { void writeTuple(Tuple tuple) throws IOException { super.write(tuple); if (tuple != null) { - for(int i = 0; i < fields.size(); ++i) { - TreeWriter writer = childrenWriters[i]; - writer.write(tuple.asDatum(i)); + for(int i = 0; i < childrenWriters.length; ++i) { + childrenWriters[i].write(tuple.asDatum(i)); } } } @@ -1654,159 +1622,136 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } - private static TreeWriter createTreeWriter(ObjectInspector inspector, + private static TreeWriter createTreeWriter(TypeDescription schema, StreamFactory streamFactory, boolean nullable) throws IOException { - switch (inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { - case BOOLEAN: - case VOID: - return new BooleanTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case BYTE: - return new ByteTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case SHORT: - case INT: - case LONG: - return new IntegerTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case FLOAT: - return new FloatTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DOUBLE: - return new DoubleTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case STRING: - return new StringTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case CHAR: - return new CharTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case VARCHAR: - return new VarcharTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case BINARY: - return new BinaryTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case TIMESTAMP: - return new TimestampTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DATE: - return new DateTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - default: - throw new IllegalArgumentException("Bad primitive category " + - ((PrimitiveObjectInspector) inspector).getPrimitiveCategory()); - } + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case BYTE: + return new ByteTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case SHORT: + case INT: + case LONG: + return new IntegerTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case FLOAT: + return new FloatTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case DOUBLE: + return new DoubleTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case STRING: + return new StringTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case CHAR: + return new CharTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case BINARY: + return new BinaryTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case TIMESTAMP: + return new TimestampTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case DATE: + return new DateTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); case STRUCT: - return new StructTreeWriter(streamFactory.getNextColumnId(), inspector, - streamFactory, nullable); + return new StructTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); default: throw new IllegalArgumentException("Bad category: " + - inspector.getCategory()); + schema.getCategory()); } } private static void writeTypes(OrcProto.Footer.Builder builder, - TreeWriter treeWriter) { + TypeDescription schema) { OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); - switch (treeWriter.inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) treeWriter.inspector). - getPrimitiveCategory()) { - case VOID: - case BOOLEAN: - type.setKind(OrcProto.Type.Kind.BOOLEAN); - break; - case BYTE: - type.setKind(OrcProto.Type.Kind.BYTE); - break; - case SHORT: - type.setKind(OrcProto.Type.Kind.SHORT); - break; - case INT: - type.setKind(OrcProto.Type.Kind.INT); - break; - case LONG: - type.setKind(OrcProto.Type.Kind.LONG); - break; - case FLOAT: - type.setKind(OrcProto.Type.Kind.FLOAT); - break; - case DOUBLE: - type.setKind(OrcProto.Type.Kind.DOUBLE); - break; - case STRING: - type.setKind(OrcProto.Type.Kind.STRING); - break; - case CHAR: - // The char length needs to be written to file and should be available - // from the object inspector - CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); - type.setKind(Type.Kind.CHAR); - type.setMaximumLength(charTypeInfo.getLength()); - break; - case VARCHAR: - // The varchar length needs to be written to file and should be available - // from the object inspector - VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); - type.setKind(Type.Kind.VARCHAR); - type.setMaximumLength(typeInfo.getLength()); - break; - case BINARY: - type.setKind(OrcProto.Type.Kind.BINARY); - break; - case TIMESTAMP: - type.setKind(OrcProto.Type.Kind.TIMESTAMP); - break; - case DATE: - type.setKind(OrcProto.Type.Kind.DATE); - break; - case DECIMAL: - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)((PrimitiveObjectInspector)treeWriter.inspector).getTypeInfo(); - type.setKind(OrcProto.Type.Kind.DECIMAL); - type.setPrecision(decTypeInfo.precision()); - type.setScale(decTypeInfo.scale()); - break; - default: - throw new IllegalArgumentException("Unknown primitive category: " + - ((PrimitiveObjectInspector) treeWriter.inspector). - getPrimitiveCategory()); - } + List<TypeDescription> children = schema.getChildren(); + switch (schema.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case VARCHAR: + type.setKind(OrcProto.Type.Kind.VARCHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(schema.getPrecision()); + type.setScale(schema.getScale()); break; case LIST: type.setKind(OrcProto.Type.Kind.LIST); - type.addSubtypes(treeWriter.childrenWriters[0].id); + type.addSubtypes(children.get(0).getId()); break; case MAP: type.setKind(OrcProto.Type.Kind.MAP); - type.addSubtypes(treeWriter.childrenWriters[0].id); - type.addSubtypes(treeWriter.childrenWriters[1].id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } break; case STRUCT: type.setKind(OrcProto.Type.Kind.STRUCT); - for(TreeWriter child: treeWriter.childrenWriters) { - type.addSubtypes(child.id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); } - for(StructField field: ((StructTreeWriter) treeWriter).fields) { - type.addFieldNames(field.getFieldName()); + for(String field: schema.getFieldNames()) { + type.addFieldNames(field); } break; case UNION: type.setKind(OrcProto.Type.Kind.UNION); - for(TreeWriter child: treeWriter.childrenWriters) { - type.addSubtypes(child.id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); } break; default: throw new IllegalArgumentException("Unknown category: " + - treeWriter.inspector.getCategory()); + schema.getCategory()); } builder.addTypes(type); - for(TreeWriter child: treeWriter.childrenWriters) { - writeTypes(builder, child); + if (children != null) { + for(TypeDescription child: children) { + writeTypes(builder, child); + } } } @@ -1853,9 +1798,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback { StreamName name = pair.getKey(); long streamSize = pair.getValue().getOutputSize(); builder.addStreams(OrcProto.Stream.newBuilder() - .setColumn(name.getColumn()) - .setKind(name.getKind()) - .setLength(streamSize)); + .setColumn(name.getColumn()) + .setKind(name.getKind()) + .setLength(streamSize)); if (StreamName.Area.INDEX == name.getArea()) { indexSize += streamSize; } else { @@ -1880,8 +1825,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // and user specified padding tolerance. Since stripe size can overflow // the default stripe size we should apply this correction to avoid // writing portion of last stripe to next hdfs block. - float correction = overflow > 0 ? (float) overflow - / (float) adjustedStripeSize : 0.0f; + double correction = overflow > 0 ? (double) overflow + / (double) adjustedStripeSize : 0.0; // correction should not be greater than user specified padding // tolerance @@ -1939,75 +1884,60 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } private long computeRawDataSize() { - long result = 0; - for (TreeWriter child : treeWriter.getChildrenWriters()) { - result += getRawDataSizeFromInspectors(child, child.inspector); - } - return result; + return getRawDataSize(treeWriter, schema); } - private long getRawDataSizeFromInspectors(TreeWriter child, ObjectInspector oi) { + private long getRawDataSize(TreeWriter child, + TypeDescription schema) { long total = 0; - switch (oi.getCategory()) { - case PRIMITIVE: - total += getRawDataSizeFromPrimitives(child, oi); - break; - case LIST: - case MAP: - case UNION: - case STRUCT: - for (TreeWriter tw : child.childrenWriters) { - total += getRawDataSizeFromInspectors(tw, tw.inspector); + long numVals = child.fileStatistics.getNumberOfValues(); + switch (schema.getCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case FLOAT: + return numVals * JavaDataModel.get().primitive1(); + case LONG: + case DOUBLE: + return numVals * JavaDataModel.get().primitive2(); + case STRING: + case VARCHAR: + case CHAR: + // ORC strings are converted to java Strings. so use JavaDataModel to + // compute the overall size of strings + StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics; + numVals = numVals == 0 ? 1 : numVals; + int avgStringLen = (int) (scs.getSum() / numVals); + return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen); + case DECIMAL: + return numVals * JavaDataModel.get().lengthOfDecimal(); + case DATE: + return numVals * JavaDataModel.get().lengthOfDate(); + case BINARY: + // get total length of binary blob + BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics; + return bcs.getSum(); + case TIMESTAMP: + return numVals * JavaDataModel.get().lengthOfTimestamp(); + case LIST: + case MAP: + case UNION: + case STRUCT: { + TreeWriter[] childWriters = child.getChildrenWriters(); + List<TypeDescription> childTypes = schema.getChildren(); + for (int i=0; i < childWriters.length; ++i) { + total += getRawDataSize(childWriters[i], childTypes.get(i)); + } + break; } - break; - default: - LOG.debug("Unknown object inspector category."); - break; + default: + LOG.debug("Unknown object inspector category."); + break; } return total; } - private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector oi) { - long result = 0; - long numVals = child.fileStatistics.getNumberOfValues(); - switch (((PrimitiveObjectInspector) oi).getPrimitiveCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case FLOAT: - return numVals * JavaDataModel.get().primitive1(); - case LONG: - case DOUBLE: - return numVals * JavaDataModel.get().primitive2(); - case STRING: - case VARCHAR: - case CHAR: - // ORC strings are converted to java Strings. so use JavaDataModel to - // compute the overall size of strings - child = (StringTreeWriter) child; - StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics; - numVals = numVals == 0 ? 1 : numVals; - int avgStringLen = (int) (scs.getSum() / numVals); - return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen); - case DECIMAL: - return numVals * JavaDataModel.get().lengthOfDecimal(); - case DATE: - return numVals * JavaDataModel.get().lengthOfDate(); - case BINARY: - // get total length of binary blob - BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics; - return bcs.getSum(); - case TIMESTAMP: - return numVals * JavaDataModel.get().lengthOfTimestamp(); - default: - LOG.debug("Unknown primitive category."); - break; - } - - return result; - } - private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { switch (kind) { case NONE: return OrcProto.CompressionKind.NONE; @@ -2027,7 +1957,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } - private int writeMetadata(long bodyLength) throws IOException { + private int writeMetadata() throws IOException { getStream(); OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder(); for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) { @@ -2052,7 +1982,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // populate raw data size rawDataSize = computeRawDataSize(); // serialize the types - writeTypes(builder, treeWriter); + writeTypes(builder, schema); // add the stripe information for(OrcProto.StripeInformation stripe: stripes) { builder.addStripes(stripe); @@ -2062,7 +1992,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // add all of the user metadata for(Map.Entry<String, ByteString> entry: userMetadata.entrySet()) { builder.addMetadata(OrcProto.UserMetadataItem.newBuilder() - .setName(entry.getKey()).setValue(entry.getValue())); + .setName(entry.getKey()).setValue(entry.getValue())); } long startPosn = rawWriter.getPos(); OrcProto.Footer footer = builder.build(); @@ -2074,14 +2004,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private int writePostScript(int footerLength, int metadataLength) throws IOException { OrcProto.PostScript.Builder builder = - OrcProto.PostScript.newBuilder() - .setCompression(writeCompressionKind(compress)) - .setFooterLength(footerLength) - .setMetadataLength(metadataLength) - .setMagic(OrcFile.MAGIC) - .addVersion(version.getMajor()) - .addVersion(version.getMinor()) - .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId()); + OrcProto.PostScript.newBuilder() + .setCompression(writeCompressionKind(compress)) + .setFooterLength(footerLength) + .setMetadataLength(metadataLength) + .setMagic(OrcFile.MAGIC) + .addVersion(version.getMajor()) + .addVersion(version.getMinor()) + .setWriterVersion(OrcFile.CURRENT_WRITER.getId()); if (compress != CompressionKind.NONE) { builder.setCompressionBlockSize(bufferSize); } @@ -2120,7 +2050,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { createRowIndexEntry(); } } - memoryManager.addedRow(); + memoryManager.addedRow(1); } @Override @@ -2132,7 +2062,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { memoryManager.removeWriter(path); // actually close the file flushStripe(); - int metadataLength = writeMetadata(rawWriter.getPos()); + int metadataLength = writeMetadata(); int footerLength = writeFooter(rawWriter.getPos() - metadataLength); rawWriter.writeByte(writePostScript(footerLength, metadataLength)); rawWriter.close(); @@ -2165,19 +2095,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback { if (callback != null) { callback.preFooterWrite(callbackContext); } - int metaLength = writeMetadata(rawWriter.getPos()); + int metaLength = writeMetadata(); int footLength = writeFooter(rawWriter.getPos() - metaLength); rawWriter.writeByte(writePostScript(footLength, metaLength)); stripesAtLastFlush = stripes.size(); - ShimLoader.getHadoopShims().hflush(rawWriter); + rawWriter.hflush(); } return rawWriter.getPos(); } @Override public void appendStripe(byte[] stripe, int offset, int length, - StripeInformation stripeInfo, - OrcProto.StripeStatistics stripeStatistics) throws IOException { + StripeInformation stripeInfo, + OrcProto.StripeStatistics stripeStatistics) throws IOException { checkArgument(stripe != null, "Stripe must not be null"); checkArgument(length <= stripe.length, "Specified length must not be greater specified array length"); @@ -2187,12 +2117,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { getStream(); long start = rawWriter.getPos(); - long stripeLen = length; long availBlockSpace = blockSize - (start % blockSize); // see if stripe can fit in the current hdfs block, else pad the remaining // space in the block - if (stripeLen < blockSize && stripeLen > availBlockSpace && + if (length < blockSize && length > availBlockSpace && addBlockPadding) { byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; LOG.info(String.format("Padding ORC by %d bytes while merging..", @@ -2245,7 +2174,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } private void getAllColumnTreeWritersImpl(TreeWriter tw, - List<TreeWriter> result) { + List<TreeWriter> result) { result.add(tw); for (TreeWriter child : tw.childrenWriters) { getAllColumnTreeWritersImpl(child, result); @@ -2253,9 +2182,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override - public void appendUserMetadata(List<UserMetadataItem> userMetadata) { + public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) { if (userMetadata != null) { - for (UserMetadataItem item : userMetadata) { + for (OrcProto.UserMetadataItem item : userMetadata) { this.userMetadata.put(item.getName(), item.getValue()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java new file mode 100644 index 0000000..2886fe7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.ReadOption; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +public class ZeroCopyAdapter { + private final FSDataInputStream in; + private final ByteBufferPoolAdapter pool; + private final static EnumSet<ReadOption> CHECK_SUM = EnumSet + .noneOf(ReadOption.class); + private final static EnumSet<ReadOption> NO_CHECK_SUM = EnumSet + .of(ReadOption.SKIP_CHECKSUMS); + + public ZeroCopyAdapter(FSDataInputStream in, ByteBufferAllocatorPool poolshim) { + this.in = in; + if (poolshim != null) { + pool = new ByteBufferPoolAdapter(poolshim); + } else { + pool = null; + } + } + + public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) + throws IOException { + EnumSet<ReadOption> options = NO_CHECK_SUM; + if (verifyChecksums) { + options = CHECK_SUM; + } + return this.in.read(this.pool, maxLength, options); + } + + public final void releaseBuffer(ByteBuffer buffer) { + this.in.releaseBuffer(buffer); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java deleted file mode 100644 index d0a8fa7..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; -import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; -import org.apache.hadoop.hive.shims.ShimLoader; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.zip.DataFormatException; -import java.util.zip.Deflater; -import java.util.zip.Inflater; - -class ZlibCodec implements CompressionCodec, DirectDecompressionCodec { - - private Boolean direct = null; - - private final int level; - private final int strategy; - - public ZlibCodec() { - level = Deflater.DEFAULT_COMPRESSION; - strategy = Deflater.DEFAULT_STRATEGY; - } - - private ZlibCodec(int level, int strategy) { - this.level = level; - this.strategy = strategy; - } - - @Override - public boolean compress(ByteBuffer in, ByteBuffer out, - ByteBuffer overflow) throws IOException { - Deflater deflater = new Deflater(level, true); - deflater.setStrategy(strategy); - int length = in.remaining(); - deflater.setInput(in.array(), in.arrayOffset() + in.position(), length); - deflater.finish(); - int outSize = 0; - int offset = out.arrayOffset() + out.position(); - while (!deflater.finished() && (length > outSize)) { - int size = deflater.deflate(out.array(), offset, out.remaining()); - out.position(size + out.position()); - outSize += size; - offset += size; - // if we run out of space in the out buffer, use the overflow - if (out.remaining() == 0) { - if (overflow == null) { - deflater.end(); - return false; - } - out = overflow; - offset = out.arrayOffset() + out.position(); - } - } - deflater.end(); - return length > outSize; - } - - @Override - public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { - - if(in.isDirect() && out.isDirect()) { - directDecompress(in, out); - return; - } - - Inflater inflater = new Inflater(true); - inflater.setInput(in.array(), in.arrayOffset() + in.position(), - in.remaining()); - while (!(inflater.finished() || inflater.needsDictionary() || - inflater.needsInput())) { - try { - int count = inflater.inflate(out.array(), - out.arrayOffset() + out.position(), - out.remaining()); - out.position(count + out.position()); - } catch (DataFormatException dfe) { - throw new IOException("Bad compression data", dfe); - } - } - out.flip(); - inflater.end(); - in.position(in.limit()); - } - - @Override - public boolean isAvailable() { - if (direct == null) { - // see nowrap option in new Inflater(boolean) which disables zlib headers - try { - if (ShimLoader.getHadoopShims().getDirectDecompressor( - DirectCompressionType.ZLIB_NOHEADER) != null) { - direct = Boolean.valueOf(true); - } else { - direct = Boolean.valueOf(false); - } - } catch (UnsatisfiedLinkError ule) { - direct = Boolean.valueOf(false); - } - } - return direct.booleanValue(); - } - - @Override - public void directDecompress(ByteBuffer in, ByteBuffer out) - throws IOException { - DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims() - .getDirectDecompressor(DirectCompressionType.ZLIB_NOHEADER); - decompressShim.decompress(in, out); - out.flip(); // flip for read - } - - @Override - public CompressionCodec modify(@Nullable EnumSet<Modifier> modifiers) { - - if (modifiers == null) { - return this; - } - - int l = this.level; - int s = this.strategy; - - for (Modifier m : modifiers) { - switch (m) { - case BINARY: - /* filtered == less LZ77, more huffman */ - s = Deflater.FILTERED; - break; - case TEXT: - s = Deflater.DEFAULT_STRATEGY; - break; - case FASTEST: - // deflate_fast looking for 8 byte patterns - l = Deflater.BEST_SPEED; - break; - case FAST: - // deflate_fast looking for 16 byte patterns - l = Deflater.BEST_SPEED + 1; - break; - case DEFAULT: - // deflate_slow looking for 128 byte patterns - l = Deflater.DEFAULT_COMPRESSION; - break; - default: - break; - } - } - return new ZlibCodec(l, s); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto b/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto deleted file mode 100644 index c80cf6c..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.orc; - -message IntegerStatistics { - optional sint64 minimum = 1; - optional sint64 maximum = 2; - optional sint64 sum = 3; -} - -message DoubleStatistics { - optional double minimum = 1; - optional double maximum = 2; - optional double sum = 3; -} - -message StringStatistics { - optional string minimum = 1; - optional string maximum = 2; - // sum will store the total length of all strings in a stripe - optional sint64 sum = 3; -} - -message BucketStatistics { - repeated uint64 count = 1 [packed=true]; -} - -message DecimalStatistics { - optional string minimum = 1; - optional string maximum = 2; - optional string sum = 3; -} - -message DateStatistics { - // min,max values saved as days since epoch - optional sint32 minimum = 1; - optional sint32 maximum = 2; -} - -message TimestampStatistics { - // min,max values saved as milliseconds since epoch - optional sint64 minimum = 1; - optional sint64 maximum = 2; -} - -message BinaryStatistics { - // sum will store the total binary blob length in a stripe - optional sint64 sum = 1; -} - -message ColumnStatistics { - optional uint64 numberOfValues = 1; - optional IntegerStatistics intStatistics = 2; - optional DoubleStatistics doubleStatistics = 3; - optional StringStatistics stringStatistics = 4; - optional BucketStatistics bucketStatistics = 5; - optional DecimalStatistics decimalStatistics = 6; - optional DateStatistics dateStatistics = 7; - optional BinaryStatistics binaryStatistics = 8; - optional TimestampStatistics timestampStatistics = 9; - optional bool hasNull = 10; -} - -message RowIndexEntry { - repeated uint64 positions = 1 [packed=true]; - optional ColumnStatistics statistics = 2; -} - -message RowIndex { - repeated RowIndexEntry entry = 1; -} - -message BloomFilter { - optional uint32 numHashFunctions = 1; - repeated fixed64 bitset = 2; -} - -message BloomFilterIndex { - repeated BloomFilter bloomFilter = 1; -} - -message Stream { - // if you add new index stream kinds, you need to make sure to update - // StreamName to ensure it is added to the stripe in the right area - enum Kind { - PRESENT = 0; - DATA = 1; - LENGTH = 2; - DICTIONARY_DATA = 3; - DICTIONARY_COUNT = 4; - SECONDARY = 5; - ROW_INDEX = 6; - BLOOM_FILTER = 7; - } - optional Kind kind = 1; - optional uint32 column = 2; - optional uint64 length = 3; -} - -message ColumnEncoding { - enum Kind { - DIRECT = 0; - DICTIONARY = 1; - DIRECT_V2 = 2; - DICTIONARY_V2 = 3; - } - optional Kind kind = 1; - optional uint32 dictionarySize = 2; -} - -message StripeFooter { - repeated Stream streams = 1; - repeated ColumnEncoding columns = 2; - optional string writerTimezone = 3; -} - -message Type { - enum Kind { - BOOLEAN = 0; - BYTE = 1; - SHORT = 2; - INT = 3; - LONG = 4; - FLOAT = 5; - DOUBLE = 6; - STRING = 7; - BINARY = 8; - TIMESTAMP = 9; - LIST = 10; - MAP = 11; - STRUCT = 12; - UNION = 13; - DECIMAL = 14; - DATE = 15; - VARCHAR = 16; - CHAR = 17; - } - optional Kind kind = 1; - repeated uint32 subtypes = 2 [packed=true]; - repeated string fieldNames = 3; - optional uint32 maximumLength = 4; - optional uint32 precision = 5; - optional uint32 scale = 6; -} - -message StripeInformation { - optional uint64 offset = 1; - optional uint64 indexLength = 2; - optional uint64 dataLength = 3; - optional uint64 footerLength = 4; - optional uint64 numberOfRows = 5; -} - -message UserMetadataItem { - optional string name = 1; - optional bytes value = 2; -} - -message StripeStatistics { - repeated ColumnStatistics colStats = 1; -} - -message Metadata { - repeated StripeStatistics stripeStats = 1; -} - -message Footer { - optional uint64 headerLength = 1; - optional uint64 contentLength = 2; - repeated StripeInformation stripes = 3; - repeated Type types = 4; - repeated UserMetadataItem metadata = 5; - optional uint64 numberOfRows = 6; - repeated ColumnStatistics statistics = 7; - optional uint32 rowIndexStride = 8; -} - -enum CompressionKind { - NONE = 0; - ZLIB = 1; - SNAPPY = 2; - LZO = 3; -} - -// Serialized length must be less that 255 bytes -message PostScript { - optional uint64 footerLength = 1; - optional CompressionKind compression = 2; - optional uint64 compressionBlockSize = 3; - // the version of the file format - // [0, 11] = Hive 0.11 - // [0, 12] = Hive 0.12 - repeated uint32 version = 4 [packed = true]; - optional uint64 metadataLength = 5; - // Version of the writer: - // 0 (or missing) = original - // 1 = HIVE-8732 fixed - optional uint32 writerVersion = 6; - // Leave this last in the record - optional string magic = 8000; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index b63b497..608d066 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -27,6 +27,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.orc.OrcConf; import org.apache.tajo.BuiltinStorages; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -61,6 +62,7 @@ public class TestCompressionStorages { public TestCompressionStorages(String type) throws IOException { this.dataFormat = type; conf = new TajoConf(); + conf.setBoolean("hive.exec.orc.zerocopy", true); testDir = CommonTestingUtil.getTestDir(TEST_PATH); fs = testDir.getFileSystem(conf); @@ -71,7 +73,8 @@ public class TestCompressionStorages { return Arrays.asList(new Object[][]{ {BuiltinStorages.TEXT}, {BuiltinStorages.RCFILE}, - {BuiltinStorages.SEQUENCE_FILE} + {BuiltinStorages.SEQUENCE_FILE}, + {BuiltinStorages.ORC} }); } @@ -120,6 +123,14 @@ public class TestCompressionStorages { meta.putProperty("rcfile.serde", TextSerializerDeserializer.class.getName()); meta.putProperty("sequencefile.serde", TextSerializerDeserializer.class.getName()); + if (codec.equals(SnappyCodec.class)) { + meta.putProperty(OrcConf.COMPRESS.getAttribute(), "SNAPPY"); + } else if (codec.equals(Lz4Codec.class)) { + meta.putProperty(OrcConf.COMPRESS.getAttribute(), "ZLIB"); + } else { + meta.putProperty(OrcConf.COMPRESS.getAttribute(), "NONE"); + } + String fileName = "Compression_" + codec.getSimpleName(); Path tablePath = new Path(testDir, fileName); Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 552dc2e..a9d61d5 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -167,6 +167,21 @@ public class TestStorages { fs.delete(testDir, true); } + private boolean protoTypeSupport() { + return internalType; + } + + private boolean timeTypeSupport() { + return internalType + || dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT); + } + + private boolean dateTypeSupport() { + return internalType + || dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT) + || dataFormat.equalsIgnoreCase(BuiltinStorages.ORC); + } + @Test public void testSplitable() throws IOException { if (splitable) { @@ -385,8 +400,6 @@ public class TestStorages { @Test public void testVariousTypes() throws IOException { - boolean handleProtobuf = !dataFormat.equalsIgnoreCase(BuiltinStorages.JSON); - Schema schema = new Schema(); schema.addColumn("col1", Type.BOOLEAN); schema.addColumn("col2", Type.CHAR, 7); @@ -398,7 +411,7 @@ public class TestStorages { schema.addColumn("col8", Type.TEXT); schema.addColumn("col9", Type.BLOB); schema.addColumn("col10", Type.INET4); - if (handleProtobuf) { + if (protoTypeSupport()) { schema.addColumn("col11", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); } @@ -418,7 +431,7 @@ public class TestStorages { QueryId queryid = new QueryId("12345", 5); ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - VTuple tuple = new VTuple(10 + (handleProtobuf ? 1 : 0)); + VTuple tuple = new VTuple(10 + (protoTypeSupport() ? 1 : 0)); tuple.put(new Datum[] { DatumFactory.createBool(true), DatumFactory.createChar("hyunsik"), @@ -432,7 +445,7 @@ public class TestStorages { DatumFactory.createInet4("192.168.0.1"), }); - if (handleProtobuf) { + if (protoTypeSupport()) { tuple.put(10, factory.createDatum(queryid.getProto())); } @@ -456,8 +469,6 @@ public class TestStorages { @Test public void testNullHandlingTypes() throws IOException { - boolean handleProtobuf = !dataFormat.equalsIgnoreCase(BuiltinStorages.JSON); - Schema schema = new Schema(); schema.addColumn("col1", Type.BOOLEAN); schema.addColumn("col2", Type.CHAR, 7); @@ -470,7 +481,7 @@ public class TestStorages { schema.addColumn("col9", Type.BLOB); schema.addColumn("col10", Type.INET4); - if (handleProtobuf) { + if (protoTypeSupport()) { schema.addColumn("col11", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); } @@ -492,7 +503,7 @@ public class TestStorages { QueryId queryid = new QueryId("12345", 5); ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - int columnNum = 10 + (handleProtobuf ? 1 : 0); + int columnNum = 10 + (protoTypeSupport() ? 1 : 0); VTuple seedTuple = new VTuple(columnNum); seedTuple.put(new Datum[]{ DatumFactory.createBool(true), // 0 @@ -507,7 +518,7 @@ public class TestStorages { DatumFactory.createInet4("192.168.0.1") // 10 }); - if (handleProtobuf) { + if (protoTypeSupport()) { seedTuple.put(10, factory.createDatum(queryid.getProto())); // 11 } @@ -553,8 +564,6 @@ public class TestStorages { public void testNullHandlingTypesWithProjection() throws IOException { if (internalType) return; - boolean handleProtobuf = !dataFormat.equalsIgnoreCase(BuiltinStorages.JSON); - Schema schema = new Schema(); schema.addColumn("col1", Type.BOOLEAN); schema.addColumn("col2", Type.CHAR, 7); @@ -567,7 +576,7 @@ public class TestStorages { schema.addColumn("col9", Type.BLOB); schema.addColumn("col10", Type.INET4); - if (handleProtobuf) { + if (protoTypeSupport()) { schema.addColumn("col11", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); } @@ -589,7 +598,7 @@ public class TestStorages { QueryId queryid = new QueryId("12345", 5); ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - int columnNum = 10 + (handleProtobuf ? 1 : 0); + int columnNum = 10 + (protoTypeSupport() ? 1 : 0); VTuple seedTuple = new VTuple(columnNum); seedTuple.put(new Datum[]{ DatumFactory.createBool(true), // 0 @@ -604,7 +613,7 @@ public class TestStorages { DatumFactory.createInet4("192.168.0.1") // 10 }); - if (handleProtobuf) { + if (protoTypeSupport()) { seedTuple.put(10, factory.createDatum(queryid.getProto())); // 11 } @@ -933,11 +942,17 @@ public class TestStorages { @Test public void testTime() throws IOException { - if (dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT) || internalType) { + if (dateTypeSupport() || timeTypeSupport()) { + + int index = 2; Schema schema = new Schema(); - schema.addColumn("col1", Type.DATE); - schema.addColumn("col2", Type.TIME); - schema.addColumn("col3", Type.TIMESTAMP); + schema.addColumn("col1", Type.TIMESTAMP); + if (dateTypeSupport()) { + schema.addColumn("col" + index++, Type.DATE); + } + if (timeTypeSupport()) { + schema.addColumn("col" + index++, Type.TIME); + } KeyValueSet options = new KeyValueSet(); TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options); @@ -947,11 +962,15 @@ public class TestStorages { Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); - VTuple tuple = new VTuple(new Datum[]{ - DatumFactory.createDate("1980-04-01"), - DatumFactory.createTime("12:34:56"), - DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)) - }); + VTuple tuple = new VTuple(index - 1); + index = 0; + tuple.put(index++, DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000))); + if (dateTypeSupport()) { + tuple.put(index++, DatumFactory.createDate("1980-04-01")); + } + if (timeTypeSupport()) { + tuple.put(index, DatumFactory.createTime("12:34:56")); + } appender.addTuple(tuple); appender.flush(); appender.close(); @@ -964,7 +983,7 @@ public class TestStorages { Tuple retrieved; while ((retrieved = scanner.next()) != null) { for (int i = 0; i < tuple.size(); i++) { - assertEquals(tuple.get(i), retrieved.asDatum(i)); + assertEquals("failed at " + i + " th column", tuple.get(i), retrieved.asDatum(i)); } } scanner.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc index f71f052..f1d1368 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc @@ -12,8 +12,7 @@ { "name": "col7", "type": "double" }, { "name": "col8", "type": "string" }, { "name": "col9", "type": "bytes" }, - { "name": "col10", "type": "bytes" }, - { "name": "col11", "type": "bytes" } + { "name": "col10", "type": "bytes" } ] } http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index 6f7e53b..3283f9f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -117,7 +117,7 @@ <property> <name>tajo.storage.scanner-handler.orc.class</name> - <value>org.apache.tajo.storage.orc.ORCScanner</value> + <value>org.apache.tajo.storage.orc.OrcScanner</value> </property> <property>
