HIVE-4243. Fix column names in ORC metadata.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7b1ed3d3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7b1ed3d3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7b1ed3d3 Branch: refs/heads/llap Commit: 7b1ed3d3037860e2b7fc24b760a993f5e928b816 Parents: 99fa337 Author: Owen O'Malley <[email protected]> Authored: Fri Sep 4 16:11:13 2015 -0700 Committer: Owen O'Malley <[email protected]> Committed: Thu Oct 1 13:07:03 2015 +0200 ---------------------------------------------------------------------- .../hive/ql/io/orc/ColumnStatisticsImpl.java | 55 +- .../apache/hadoop/hive/ql/io/orc/OrcFile.java | 33 +- .../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 145 ++++- .../apache/hadoop/hive/ql/io/orc/OrcUtils.java | 177 +----- .../hadoop/hive/ql/io/orc/ReaderImpl.java | 2 +- .../hadoop/hive/ql/io/orc/TypeDescription.java | 466 ++++++++++++++++ .../apache/hadoop/hive/ql/io/orc/Writer.java | 9 + .../hadoop/hive/ql/io/orc/WriterImpl.java | 550 +++++++++---------- .../hadoop/hive/ql/io/orc/orc_proto.proto | 1 + .../hive/ql/io/orc/TestColumnStatistics.java | 43 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 15 +- .../hadoop/hive/ql/io/orc/TestOrcFile.java | 41 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 2 +- .../hadoop/hive/ql/io/orc/TestOrcWideTable.java | 224 +------- .../hive/ql/io/orc/TestTypeDescription.java | 67 +++ .../resources/orc-file-dump-bloomfilter.out | 2 +- .../resources/orc-file-dump-bloomfilter2.out | 2 +- .../orc-file-dump-dictionary-threshold.out | 2 +- ql/src/test/resources/orc-file-dump.json | 2 +- ql/src/test/resources/orc-file-dump.out | 2 +- ql/src/test/resources/orc-file-has-null.out | 2 +- .../clientpositive/annotate_stats_part.q.out | 6 +- .../clientpositive/annotate_stats_table.q.out | 4 +- .../dynpart_sort_opt_vectorization.q.out | 16 +- .../dynpart_sort_optimization2.q.out | 8 +- .../extrapolate_part_stats_full.q.out | 24 +- .../extrapolate_part_stats_partial.q.out | 76 +-- .../extrapolate_part_stats_partial_ndv.q.out | 38 +- .../results/clientpositive/orc_analyze.q.out | 46 +- .../results/clientpositive/orc_file_dump.q.out | 18 +- .../clientpositive/orc_int_type_promotion.q.out | 6 +- .../clientpositive/spark/vectorized_ptf.q.out | 108 ++-- .../tez/dynpart_sort_opt_vectorization.q.out | 16 +- .../tez/dynpart_sort_optimization2.q.out | 8 +- .../clientpositive/tez/orc_analyze.q.out | 46 +- .../clientpositive/tez/union_fast_stats.q.out | 16 +- .../clientpositive/tez/vector_outer_join1.q.out | 48 +- .../clientpositive/tez/vector_outer_join4.q.out | 48 +- .../clientpositive/tez/vectorized_ptf.q.out | 108 ++-- .../clientpositive/union_fast_stats.q.out | 16 +- .../results/clientpositive/vectorized_ptf.q.out | 104 ++-- 41 files changed, 1468 insertions(+), 1134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java index 15a3e2c..f39d3e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java @@ -22,8 +22,6 @@ import java.sql.Timestamp; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; @@ -964,35 +962,30 @@ class ColumnStatisticsImpl implements ColumnStatistics { return builder; } - static ColumnStatisticsImpl create(ObjectInspector inspector) { - switch (inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { - case BOOLEAN: - return new BooleanStatisticsImpl(); - case BYTE: - case SHORT: - case INT: - case LONG: - return new IntegerStatisticsImpl(); - case FLOAT: - case DOUBLE: - return new DoubleStatisticsImpl(); - case STRING: - case CHAR: - case VARCHAR: - return new StringStatisticsImpl(); - case DECIMAL: - return new DecimalStatisticsImpl(); - case DATE: - return new DateStatisticsImpl(); - case TIMESTAMP: - return new TimestampStatisticsImpl(); - case BINARY: - return new BinaryStatisticsImpl(); - default: - return new ColumnStatisticsImpl(); - } + static ColumnStatisticsImpl create(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanStatisticsImpl(); + case BYTE: + case SHORT: + case INT: + case LONG: + return new IntegerStatisticsImpl(); + case FLOAT: + case DOUBLE: + return new DoubleStatisticsImpl(); + case STRING: + case CHAR: + case VARCHAR: + return new StringStatisticsImpl(); + case DECIMAL: + return new DecimalStatisticsImpl(); + case DATE: + return new DateStatisticsImpl(); + case TIMESTAMP: + return new TimestampStatisticsImpl(); + case BINARY: + return new BinaryStatisticsImpl(); default: return new ColumnStatisticsImpl(); } http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index a60ebb4..23dec4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; /** * Contains factory methods to read or write ORC files. @@ -102,7 +103,9 @@ public final class OrcFile { */ public enum WriterVersion { ORIGINAL(0), - HIVE_8732(1); // corrupted stripe/file maximum column statistics + HIVE_8732(1), // corrupted stripe/file maximum column statistics + HIVE_4243(2), // use real column names from Hive tables + FUTURE(Integer.MAX_VALUE); // a version from a future writer private final int id; @@ -205,7 +208,9 @@ public final class OrcFile { public static class WriterOptions { private final Configuration configuration; private FileSystem fileSystemValue = null; - private ObjectInspector inspectorValue = null; + private boolean explicitSchema = false; + private TypeDescription schema = null; + private ObjectInspector inspector = null; private long stripeSizeValue; private long blockSizeValue; private int rowIndexStrideValue; @@ -355,11 +360,26 @@ public final class OrcFile { } /** - * A required option that sets the object inspector for the rows. Used - * to determine the schema for the file. + * A required option that sets the object inspector for the rows. If + * setSchema is not called, it also defines the schema. */ public WriterOptions inspector(ObjectInspector value) { - inspectorValue = value; + this.inspector = value; + if (!explicitSchema) { + schema = OrcOutputFormat.convertTypeInfo( + TypeInfoUtils.getTypeInfoFromObjectInspector(value)); + } + return this; + } + + /** + * Set the schema for the file. This is a required parameter. + * @param schema the schema for the file. + * @return this + */ + public WriterOptions setSchema(TypeDescription schema) { + this.explicitSchema = true; + this.schema = schema; return this; } @@ -426,7 +446,8 @@ public final class OrcFile { FileSystem fs = opts.fileSystemValue == null ? path.getFileSystem(opts.configuration) : opts.fileSystemValue; - return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue, + return new WriterImpl(fs, path, opts.configuration, opts.inspector, + opts.schema, opts.stripeSizeValue, opts.compressValue, opts.bufferSizeValue, opts.rowIndexStrideValue, opts.memoryManagerValue, opts.blockPaddingValue, http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index ea4ebb4..ad24c58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -20,12 +20,17 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy; @@ -36,6 +41,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -52,6 +66,90 @@ import org.apache.hadoop.util.Progressable; public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> implements AcidOutputFormat<NullWritable, OrcSerdeRow> { + private static final Log LOG = LogFactory.getLog(OrcOutputFormat.class); + + static TypeDescription convertTypeInfo(TypeInfo info) { + switch (info.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info; + switch (pinfo.getPrimitiveCategory()) { + case BOOLEAN: + return TypeDescription.createBoolean(); + case BYTE: + return TypeDescription.createByte(); + case SHORT: + return TypeDescription.createShort(); + case INT: + return TypeDescription.createInt(); + case LONG: + return TypeDescription.createLong(); + case FLOAT: + return TypeDescription.createFloat(); + case DOUBLE: + return TypeDescription.createDouble(); + case STRING: + return TypeDescription.createString(); + case DATE: + return TypeDescription.createDate(); + case TIMESTAMP: + return TypeDescription.createTimestamp(); + case BINARY: + return TypeDescription.createBinary(); + case DECIMAL: { + DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo; + return TypeDescription.createDecimal() + .withScale(dinfo.getScale()) + .withPrecision(dinfo.getPrecision()); + } + case VARCHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createVarchar() + .withMaxLength(cinfo.getLength()); + } + case CHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createChar() + .withMaxLength(cinfo.getLength()); + } + default: + throw new IllegalArgumentException("ORC doesn't handle primitive" + + " category " + pinfo.getPrimitiveCategory()); + } + } + case LIST: { + ListTypeInfo linfo = (ListTypeInfo) info; + return TypeDescription.createList + (convertTypeInfo(linfo.getListElementTypeInfo())); + } + case MAP: { + MapTypeInfo minfo = (MapTypeInfo) info; + return TypeDescription.createMap + (convertTypeInfo(minfo.getMapKeyTypeInfo()), + convertTypeInfo(minfo.getMapValueTypeInfo())); + } + case UNION: { + UnionTypeInfo minfo = (UnionTypeInfo) info; + TypeDescription result = TypeDescription.createUnion(); + for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) { + result.addUnionChild(convertTypeInfo(child)); + } + return result; + } + case STRUCT: { + StructTypeInfo sinfo = (StructTypeInfo) info; + TypeDescription result = TypeDescription.createStruct(); + for(String fieldName: sinfo.getAllStructFieldNames()) { + result.addField(fieldName, + convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName))); + } + return result; + } + default: + throw new IllegalArgumentException("ORC doesn't handle " + + info.getCategory()); + } + } + private static class OrcRecordWriter implements RecordWriter<NullWritable, OrcSerdeRow>, StatsProvidingRecordWriter { @@ -115,7 +213,44 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> } private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) { - return OrcFile.writerOptions(props, conf); + OrcFile.WriterOptions result = OrcFile.writerOptions(props, conf); + if (props != null) { + final String columnNameProperty = + props.getProperty(IOConstants.COLUMNS); + final String columnTypeProperty = + props.getProperty(IOConstants.COLUMNS_TYPES); + if (columnNameProperty != null && + !columnNameProperty.isEmpty() && + columnTypeProperty != null && + !columnTypeProperty.isEmpty()) { + List<String> columnNames; + List<TypeInfo> columnTypes; + + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList<String>(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = + TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + TypeDescription schema = TypeDescription.createStruct(); + for (int i = 0; i < columnNames.size(); ++i) { + schema.addField(columnNames.get(i), + convertTypeInfo(columnTypes.get(i))); + } + if (LOG.isDebugEnabled()) { + LOG.debug("ORC schema = " + schema); + } + result.setSchema(schema); + } + } + return result; } @Override @@ -123,7 +258,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> getRecordWriter(FileSystem fileSystem, JobConf conf, String name, Progressable reporter) throws IOException { return new - OrcRecordWriter(new Path(name), getOptions(conf,null)); + OrcRecordWriter(new Path(name), getOptions(conf, null)); } @@ -135,7 +270,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> boolean isCompressed, Properties tableProperties, Progressable reporter) throws IOException { - return new OrcRecordWriter(path, getOptions(conf,tableProperties)); + return new OrcRecordWriter(path, getOptions(conf, tableProperties)); } private class DummyOrcRecordUpdater implements RecordUpdater { @@ -229,8 +364,8 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> } @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path, - Options options) throws IOException { + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter + getRawRecordWriter(Path path, Options options) throws IOException { final Path filename = AcidUtils.createFilename(path, options); final OrcFile.WriterOptions opts = OrcFile.writerOptions(options.getConfiguration()); http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index db2ca15..3e2af23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -18,20 +18,10 @@ package org.apache.hadoop.hive.ql.io.orc; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; - -import com.google.common.collect.Lists; public class OrcUtils { private static final Log LOG = LogFactory.getLog(OrcUtils.class); @@ -49,159 +39,44 @@ public class OrcUtils { * index 5 correspond to column d. After flattening list<string> gets 2 columns. * * @param selectedColumns - comma separated list of selected column names - * @param allColumns - comma separated list of all column names - * @param inspector - object inspector + * @param schema - object schema * @return - boolean array with true value set for the specified column names */ - public static boolean[] includeColumns(String selectedColumns, String allColumns, - ObjectInspector inspector) { - int numFlattenedCols = getFlattenedColumnsCount(inspector); - boolean[] results = new boolean[numFlattenedCols]; + public static boolean[] includeColumns(String selectedColumns, + TypeDescription schema) { + int numFlattenedCols = schema.getMaximumId(); + boolean[] results = new boolean[numFlattenedCols + 1]; if ("*".equals(selectedColumns)) { Arrays.fill(results, true); return results; } - if (selectedColumns != null && !selectedColumns.isEmpty()) { - includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, inspector); - } - return results; - } - - private static void includeColumnsImpl(boolean[] includeColumns, String selectedColumns, - String allColumns, - ObjectInspector inspector) { - Map<String, List<Integer>> columnSpanMap = getColumnSpan(allColumns, inspector); - LOG.info("columnSpanMap: " + columnSpanMap); - - String[] selCols = selectedColumns.split(","); - for (String sc : selCols) { - if (columnSpanMap.containsKey(sc)) { - List<Integer> colSpan = columnSpanMap.get(sc); - int start = colSpan.get(0); - int end = colSpan.get(1); - for (int i = start; i <= end; i++) { - includeColumns[i] = true; + if (selectedColumns != null && + schema.getCategory() == TypeDescription.Category.STRUCT) { + List<String> fieldNames = schema.getFieldNames(); + List<TypeDescription> fields = schema.getChildren(); + for (String column: selectedColumns.split((","))) { + TypeDescription col = findColumn(column, fieldNames, fields); + if (col != null) { + for(int i=col.getId(); i <= col.getMaximumId(); ++i) { + results[i] = true; } } } - - LOG.info("includeColumns: " + Arrays.toString(includeColumns)); } - - private static Map<String, List<Integer>> getColumnSpan(String allColumns, - ObjectInspector inspector) { - // map that contains the column span for each column. Column span is the number of columns - // required after flattening. For a given object inspector this map contains the start column - // id and end column id (both inclusive) after flattening. - // EXAMPLE: - // schema: struct<a:int, b:float, c:map<string,int>> - // column span map for the above struct will be - // a => [1,1], b => [2,2], c => [3,5] - Map<String, List<Integer>> columnSpanMap = new HashMap<String, List<Integer>>(); - if (allColumns != null) { - String[] columns = allColumns.split(","); - int startIdx = 0; - int endIdx = 0; - if (inspector instanceof StructObjectInspector) { - StructObjectInspector soi = (StructObjectInspector) inspector; - List<? extends StructField> fields = soi.getAllStructFieldRefs(); - for (int i = 0; i < fields.size(); i++) { - StructField sf = fields.get(i); - - // we get the type (category) from object inspector but column name from the argument. - // The reason for this is hive (FileSinkOperator) does not pass the actual column names, - // instead it passes the internal column names (_col1,_col2). - ObjectInspector sfOI = sf.getFieldObjectInspector(); - String colName = columns[i]; - - startIdx = endIdx + 1; - switch (sfOI.getCategory()) { - case PRIMITIVE: - endIdx += 1; - break; - case STRUCT: - endIdx += 1; - StructObjectInspector structInsp = (StructObjectInspector) sfOI; - List<? extends StructField> structFields = structInsp.getAllStructFieldRefs(); - for (int j = 0; j < structFields.size(); ++j) { - endIdx += getFlattenedColumnsCount(structFields.get(j).getFieldObjectInspector()); - } - break; - case MAP: - endIdx += 1; - MapObjectInspector mapInsp = (MapObjectInspector) sfOI; - endIdx += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector()); - endIdx += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector()); - break; - case LIST: - endIdx += 1; - ListObjectInspector listInsp = (ListObjectInspector) sfOI; - endIdx += getFlattenedColumnsCount(listInsp.getListElementObjectInspector()); - break; - case UNION: - endIdx += 1; - UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI; - List<ObjectInspector> choices = unionInsp.getObjectInspectors(); - for (int j = 0; j < choices.size(); ++j) { - endIdx += getFlattenedColumnsCount(choices.get(j)); - } - break; - default: - throw new IllegalArgumentException("Bad category: " + - inspector.getCategory()); - } - - columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx)); - } - } - } - return columnSpanMap; + return results; } - /** - * Returns the number of columns after flatting complex types. - * - * @param inspector - object inspector - * @return - */ - public static int getFlattenedColumnsCount(ObjectInspector inspector) { - int numWriters = 0; - switch (inspector.getCategory()) { - case PRIMITIVE: - numWriters += 1; - break; - case STRUCT: - numWriters += 1; - StructObjectInspector structInsp = (StructObjectInspector) inspector; - List<? extends StructField> fields = structInsp.getAllStructFieldRefs(); - for (int i = 0; i < fields.size(); ++i) { - numWriters += getFlattenedColumnsCount(fields.get(i).getFieldObjectInspector()); - } - break; - case MAP: - numWriters += 1; - MapObjectInspector mapInsp = (MapObjectInspector) inspector; - numWriters += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector()); - numWriters += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector()); - break; - case LIST: - numWriters += 1; - ListObjectInspector listInsp = (ListObjectInspector) inspector; - numWriters += getFlattenedColumnsCount(listInsp.getListElementObjectInspector()); - break; - case UNION: - numWriters += 1; - UnionObjectInspector unionInsp = (UnionObjectInspector) inspector; - List<ObjectInspector> choices = unionInsp.getObjectInspectors(); - for (int i = 0; i < choices.size(); ++i) { - numWriters += getFlattenedColumnsCount(choices.get(i)); - } - break; - default: - throw new IllegalArgumentException("Bad category: " + - inspector.getCategory()); + private static TypeDescription findColumn(String columnName, + List<String> fieldNames, + List<TypeDescription> fields) { + int i = 0; + for(String fieldName: fieldNames) { + if (fieldName.equalsIgnoreCase(columnName)) { + return fields.get(i); + } else { + i += 1; + } } - return numWriters; + return null; } - } http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 23b3b55..36fb858 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -347,7 +347,7 @@ public class ReaderImpl implements Reader { return version; } } - return OrcFile.WriterVersion.ORIGINAL; + return OrcFile.WriterVersion.FUTURE; } /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */ http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java new file mode 100644 index 0000000..3481bb3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This is the description of the types in an ORC file. + */ +public class TypeDescription { + private static final int MAX_PRECISION = 38; + private static final int MAX_SCALE = 38; + private static final int DEFAULT_PRECISION = 38; + private static final int DEFAULT_SCALE = 10; + private static final int DEFAULT_LENGTH = 256; + public enum Category { + BOOLEAN("boolean", true), + BYTE("tinyint", true), + SHORT("smallint", true), + INT("int", true), + LONG("bigint", true), + FLOAT("float", true), + DOUBLE("double", true), + STRING("string", true), + DATE("date", true), + TIMESTAMP("timestamp", true), + BINARY("binary", true), + DECIMAL("decimal", true), + VARCHAR("varchar", true), + CHAR("char", true), + LIST("array", false), + MAP("map", false), + STRUCT("struct", false), + UNION("union", false); + + Category(String name, boolean isPrimitive) { + this.name = name; + this.isPrimitive = isPrimitive; + } + + final boolean isPrimitive; + final String name; + + public boolean isPrimitive() { + return isPrimitive; + } + + public String getName() { + return name; + } + } + + public static TypeDescription createBoolean() { + return new TypeDescription(Category.BOOLEAN); + } + + public static TypeDescription createByte() { + return new TypeDescription(Category.BYTE); + } + + public static TypeDescription createShort() { + return new TypeDescription(Category.SHORT); + } + + public static TypeDescription createInt() { + return new TypeDescription(Category.INT); + } + + public static TypeDescription createLong() { + return new TypeDescription(Category.LONG); + } + + public static TypeDescription createFloat() { + return new TypeDescription(Category.FLOAT); + } + + public static TypeDescription createDouble() { + return new TypeDescription(Category.DOUBLE); + } + + public static TypeDescription createString() { + return new TypeDescription(Category.STRING); + } + + public static TypeDescription createDate() { + return new TypeDescription(Category.DATE); + } + + public static TypeDescription createTimestamp() { + return new TypeDescription(Category.TIMESTAMP); + } + + public static TypeDescription createBinary() { + return new TypeDescription(Category.BINARY); + } + + public static TypeDescription createDecimal() { + return new TypeDescription(Category.DECIMAL); + } + + /** + * For decimal types, set the precision. + * @param precision the new precision + * @return this + */ + public TypeDescription withPrecision(int precision) { + if (category != Category.DECIMAL) { + throw new IllegalArgumentException("precision is only allowed on decimal"+ + " and not " + category.name); + } else if (precision < 1 || precision > MAX_PRECISION || scale > precision){ + throw new IllegalArgumentException("precision " + precision + + " is out of range 1 .. " + scale); + } + this.precision = precision; + return this; + } + + /** + * For decimal types, set the scale. + * @param scale the new scale + * @return this + */ + public TypeDescription withScale(int scale) { + if (category != Category.DECIMAL) { + throw new IllegalArgumentException("scale is only allowed on decimal"+ + " and not " + category.name); + } else if (scale < 0 || scale > MAX_SCALE || scale > precision) { + throw new IllegalArgumentException("scale is out of range at " + scale); + } + this.scale = scale; + return this; + } + + public static TypeDescription createVarchar() { + return new TypeDescription(Category.VARCHAR); + } + + public static TypeDescription createChar() { + return new TypeDescription(Category.CHAR); + } + + /** + * Set the maximum length for char and varchar types. + * @param maxLength the maximum value + * @return this + */ + public TypeDescription withMaxLength(int maxLength) { + if (category != Category.VARCHAR && category != Category.CHAR) { + throw new IllegalArgumentException("maxLength is only allowed on char" + + " and varchar and not " + category.name); + } + this.maxLength = maxLength; + return this; + } + + public static TypeDescription createList(TypeDescription childType) { + TypeDescription result = new TypeDescription(Category.LIST); + result.children.add(childType); + childType.parent = result; + return result; + } + + public static TypeDescription createMap(TypeDescription keyType, + TypeDescription valueType) { + TypeDescription result = new TypeDescription(Category.MAP); + result.children.add(keyType); + result.children.add(valueType); + keyType.parent = result; + valueType.parent = result; + return result; + } + + public static TypeDescription createUnion() { + return new TypeDescription(Category.UNION); + } + + public static TypeDescription createStruct() { + return new TypeDescription(Category.STRUCT); + } + + /** + * Add a child to a union type. + * @param child a new child type to add + * @return the union type. + */ + public TypeDescription addUnionChild(TypeDescription child) { + if (category != Category.UNION) { + throw new IllegalArgumentException("Can only add types to union type" + + " and not " + category); + } + children.add(child); + child.parent = this; + return this; + } + + /** + * Add a field to a struct type as it is built. + * @param field the field name + * @param fieldType the type of the field + * @return the struct type + */ + public TypeDescription addField(String field, TypeDescription fieldType) { + if (category != Category.STRUCT) { + throw new IllegalArgumentException("Can only add fields to struct type" + + " and not " + category); + } + fieldNames.add(field); + children.add(fieldType); + fieldType.parent = this; + return this; + } + + /** + * Get the id for this type. + * The first call will cause all of the the ids in tree to be assigned, so + * it should not be called before the type is completely built. + * @return the sequential id + */ + public int getId() { + // if the id hasn't been assigned, assign all of the ids from the root + if (id == -1) { + TypeDescription root = this; + while (root.parent != null) { + root = root.parent; + } + root.assignIds(0); + } + return id; + } + + /** + * Get the maximum id assigned to this type or its children. + * The first call will cause all of the the ids in tree to be assigned, so + * it should not be called before the type is completely built. + * @return the maximum id assigned under this type + */ + public int getMaximumId() { + // if the id hasn't been assigned, assign all of the ids from the root + if (maxId == -1) { + TypeDescription root = this; + while (root.parent != null) { + root = root.parent; + } + root.assignIds(0); + } + return maxId; + } + + /** + * Get the kind of this type. + * @return get the category for this type. + */ + public Category getCategory() { + return category; + } + + /** + * Get the maximum length of the type. Only used for char and varchar types. + * @return the maximum length of the string type + */ + public int getMaxLength() { + return maxLength; + } + + /** + * Get the precision of the decimal type. + * @return the number of digits for the precision. + */ + public int getPrecision() { + return precision; + } + + /** + * Get the scale of the decimal type. + * @return the number of digits for the scale. + */ + public int getScale() { + return scale; + } + + /** + * For struct types, get the list of field names. + * @return the list of field names. + */ + public List<String> getFieldNames() { + return Collections.unmodifiableList(fieldNames); + } + + /** + * Get the subtypes of this type. + * @return the list of children types + */ + public List<TypeDescription> getChildren() { + return children == null ? null : Collections.unmodifiableList(children); + } + + /** + * Assign ids to all of the nodes under this one. + * @param startId the lowest id to assign + * @return the next available id + */ + private int assignIds(int startId) { + id = startId++; + if (children != null) { + for (TypeDescription child : children) { + startId = child.assignIds(startId); + } + } + maxId = startId - 1; + return startId; + } + + private TypeDescription(Category category) { + this.category = category; + if (category.isPrimitive) { + children = null; + } else { + children = new ArrayList<>(); + } + if (category == Category.STRUCT) { + fieldNames = new ArrayList<>(); + } else { + fieldNames = null; + } + } + + private int id = -1; + private int maxId = -1; + private TypeDescription parent; + private final Category category; + private final List<TypeDescription> children; + private final List<String> fieldNames; + private int maxLength = DEFAULT_LENGTH; + private int precision = DEFAULT_PRECISION; + private int scale = DEFAULT_SCALE; + + public void printToBuffer(StringBuilder buffer) { + buffer.append(category.name); + switch (category) { + case DECIMAL: + buffer.append('('); + buffer.append(precision); + buffer.append(','); + buffer.append(scale); + buffer.append(')'); + break; + case CHAR: + case VARCHAR: + buffer.append('('); + buffer.append(maxLength); + buffer.append(')'); + break; + case LIST: + case MAP: + case UNION: + buffer.append('<'); + for(int i=0; i < children.size(); ++i) { + if (i != 0) { + buffer.append(','); + } + children.get(i).printToBuffer(buffer); + } + buffer.append('>'); + break; + case STRUCT: + buffer.append('<'); + for(int i=0; i < children.size(); ++i) { + if (i != 0) { + buffer.append(','); + } + buffer.append(fieldNames.get(i)); + buffer.append(':'); + children.get(i).printToBuffer(buffer); + } + buffer.append('>'); + break; + default: + break; + } + } + + public String toString() { + StringBuilder buffer = new StringBuilder(); + printToBuffer(buffer); + return buffer.toString(); + } + + private void printJsonToBuffer(String prefix, StringBuilder buffer, + int indent) { + for(int i=0; i < indent; ++i) { + buffer.append(' '); + } + buffer.append(prefix); + buffer.append("{\"category\": \""); + buffer.append(category.name); + buffer.append("\", \"id\": "); + buffer.append(getId()); + buffer.append(", \"max\": "); + buffer.append(maxId); + switch (category) { + case DECIMAL: + buffer.append(", \"precision\": "); + buffer.append(precision); + buffer.append(", \"scale\": "); + buffer.append(scale); + break; + case CHAR: + case VARCHAR: + buffer.append(", \"length\": "); + buffer.append(maxLength); + break; + case LIST: + case MAP: + case UNION: + buffer.append(", \"children\": ["); + for(int i=0; i < children.size(); ++i) { + buffer.append('\n'); + children.get(i).printJsonToBuffer("", buffer, indent + 2); + if (i != children.size() - 1) { + buffer.append(','); + } + } + buffer.append("]"); + break; + case STRUCT: + buffer.append(", \"fields\": ["); + for(int i=0; i < children.size(); ++i) { + buffer.append('\n'); + children.get(i).printJsonToBuffer("\"" + fieldNames.get(i) + "\": ", + buffer, indent + 2); + if (i != children.size() - 1) { + buffer.append(','); + } + } + buffer.append(']'); + break; + default: + break; + } + buffer.append('}'); + } + + public String toJson() { + StringBuilder buffer = new StringBuilder(); + printJsonToBuffer("", buffer, 0); + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java index 6411e3f..8991f2d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -26,6 +28,13 @@ import java.util.List; * The interface for writing ORC files. */ public interface Writer { + + /** + * Get the schema for this writer + * @return the file schema + */ + TypeDescription getSchema(); + /** * Add arbitrary meta-data to the ORC file. This may be called at any point * until the Writer is closed. If the same key is passed a second time, the http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 7aa8d65..767d3f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier; import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy; @@ -54,7 +53,6 @@ import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; @@ -72,9 +70,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; @@ -127,6 +122,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final int bufferSize; private final long blockSize; private final double paddingTolerance; + private final TypeDescription schema; + // the streams that make up the current stripe private final Map<StreamName, BufferedStream> streams = new TreeMap<StreamName, BufferedStream>(); @@ -165,6 +162,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { Path path, Configuration conf, ObjectInspector inspector, + TypeDescription schema, long stripeSize, CompressionKind compress, int bufferSize, @@ -183,6 +181,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { this.path = path; this.conf = conf; this.callback = callback; + this.schema = schema; if (callback != null) { callbackContext = new OrcFile.WriterContext(){ @@ -207,21 +206,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback { this.memoryManager = memoryManager; buildIndex = rowIndexStride > 0; codec = createCodec(compress); - String allColumns = conf.get(IOConstants.COLUMNS); - if (allColumns == null) { - allColumns = getColumnNamesFromInspector(inspector); - } - this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize); + int numColumns = schema.getMaximumId() + 1; + this.bufferSize = getEstimatedBufferSize(getMemoryAvailableForORC(), + codec != null, numColumns, bufferSize); if (version == OrcFile.Version.V_0_11) { /* do not write bloom filters for ORC v11 */ - this.bloomFilterColumns = - OrcUtils.includeColumns(null, allColumns, inspector); + this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1]; } else { this.bloomFilterColumns = - OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector); + OrcUtils.includeColumns(bloomFilterColumnNames, schema); } this.bloomFilterFpp = bloomFilterFpp; - treeWriter = createTreeWriter(inspector, streamFactory, false); + treeWriter = createTreeWriter(inspector, schema, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { throw new IllegalArgumentException("Row stride must be at least " + MIN_ROW_INDEX_STRIDE); @@ -231,62 +227,42 @@ public class WriterImpl implements Writer, MemoryManager.Callback { memoryManager.addWriter(path, stripeSize, this); } - private String getColumnNamesFromInspector(ObjectInspector inspector) { - List<String> fieldNames = Lists.newArrayList(); - Joiner joiner = Joiner.on(","); - if (inspector instanceof StructObjectInspector) { - StructObjectInspector soi = (StructObjectInspector) inspector; - List<? extends StructField> fields = soi.getAllStructFieldRefs(); - for(StructField sf : fields) { - fieldNames.add(sf.getFieldName()); - } - } - return joiner.join(fieldNames); - } + static int getEstimatedBufferSize(long availableMem, + boolean isCompressed, + int columnCount, int bs) { + if (columnCount > COLUMN_COUNT_THRESHOLD) { + // In BufferedStream, there are 3 outstream buffers (compressed, + // uncompressed and overflow) and list of previously compressed buffers. + // Since overflow buffer is rarely used, lets consider only 2 allocation. + // Also, initially, the list of compression buffers will be empty. + final int outStreamBuffers = isCompressed ? 2 : 1; - @VisibleForTesting - int getEstimatedBufferSize(int bs) { - return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs); - } + // max possible streams per column is 5. For string columns, there is + // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams. + final int maxStreams = 5; - int getEstimatedBufferSize(String colNames, int bs) { - long availableMem = getMemoryAvailableForORC(); - if (colNames != null) { - final int numCols = colNames.split(",").length; - if (numCols > COLUMN_COUNT_THRESHOLD) { - // In BufferedStream, there are 3 outstream buffers (compressed, - // uncompressed and overflow) and list of previously compressed buffers. - // Since overflow buffer is rarely used, lets consider only 2 allocation. - // Also, initially, the list of compression buffers will be empty. - final int outStreamBuffers = codec == null ? 1 : 2; - - // max possible streams per column is 5. For string columns, there is - // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams. - final int maxStreams = 5; - - // Lets assume 10% memory for holding dictionary in memory and other - // object allocations - final long miscAllocation = (long) (0.1f * availableMem); - - // compute the available memory - final long remainingMem = availableMem - miscAllocation; - - int estBufferSize = (int) (remainingMem / - (maxStreams * outStreamBuffers * numCols)); - estBufferSize = getClosestBufferSize(estBufferSize, bs); - if (estBufferSize > bs) { - estBufferSize = bs; - } + // Lets assume 10% memory for holding dictionary in memory and other + // object allocations + final long miscAllocation = (long) (0.1f * availableMem); - LOG.info("WIDE TABLE - Number of columns: " + numCols + - " Chosen compression buffer size: " + estBufferSize); - return estBufferSize; + // compute the available memory + final long remainingMem = availableMem - miscAllocation; + + int estBufferSize = (int) (remainingMem / + (maxStreams * outStreamBuffers * columnCount)); + estBufferSize = getClosestBufferSize(estBufferSize); + if (estBufferSize > bs) { + estBufferSize = bs; } + + LOG.info("WIDE TABLE - Number of columns: " + columnCount + + " Chosen compression buffer size: " + estBufferSize); + return estBufferSize; } return bs; } - private int getClosestBufferSize(int estBufferSize, int bs) { + private static int getClosestBufferSize(int estBufferSize) { final int kb4 = 4 * 1024; final int kb8 = 8 * 1024; final int kb16 = 16 * 1024; @@ -546,15 +522,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } /** - * Get the current column id. After creating all tree writers this count should tell how many - * columns (including columns within nested complex objects) are created in total. - * @return current column id - */ - public int getCurrentColumnId() { - return columnCount; - } - - /** * Get the stride rate of the row index. */ public int getRowIndexStride() { @@ -666,11 +633,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback { * Create a tree writer. * @param columnId the column id of the column to write * @param inspector the object inspector to use + * @param schema the row schema * @param streamFactory limited access to the Writer's data. * @param nullable can the value be null? * @throws IOException */ TreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory streamFactory, boolean nullable) throws IOException { this.streamFactory = streamFactory; @@ -686,9 +655,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } this.foundNulls = false; createBloomFilter = streamFactory.getBloomFilterColumns()[columnId]; - indexStatistics = ColumnStatisticsImpl.create(inspector); - stripeColStatistics = ColumnStatisticsImpl.create(inspector); - fileStatistics = ColumnStatisticsImpl.create(inspector); + indexStatistics = ColumnStatisticsImpl.create(schema); + stripeColStatistics = ColumnStatisticsImpl.create(schema); + fileStatistics = ColumnStatisticsImpl.create(schema); childrenWriters = new TreeWriter[0]; rowIndex = OrcProto.RowIndex.newBuilder(); rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); @@ -749,7 +718,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { /** * Add a new value to the column. - * @param obj + * @param obj the object to write * @throws IOException */ void write(Object obj) throws IOException { @@ -919,9 +888,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { BooleanTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.writer = new BitFieldWriter(out, 1); @@ -958,9 +928,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { ByteTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.writer = new RunLengthByteWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA)); recordPosition(rowIndexPosition); @@ -1003,9 +974,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { IntegerTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); OutStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); @@ -1079,9 +1051,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { FloatTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); @@ -1123,9 +1096,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { DoubleTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); @@ -1184,9 +1158,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { StringTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); @@ -1423,9 +1398,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { CharTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); } /** @@ -1445,9 +1421,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { VarcharTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); } /** @@ -1467,9 +1444,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { BinaryTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); @@ -1531,9 +1509,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { TimestampTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); this.seconds = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); @@ -1610,9 +1589,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { DateTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); OutStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); @@ -1666,9 +1646,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { DecimalTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.scaleStream = createIntegerWriter(writer.createStream(id, @@ -1726,16 +1707,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final List<? extends StructField> fields; StructTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); + List<TypeDescription> children = schema.getChildren(); StructObjectInspector structObjectInspector = (StructObjectInspector) inspector; fields = structObjectInspector.getAllStructFieldRefs(); - childrenWriters = new TreeWriter[fields.size()]; + childrenWriters = new TreeWriter[children.size()]; for(int i=0; i < childrenWriters.length; ++i) { + ObjectInspector childOI = i < fields.size() ? + fields.get(i).getFieldObjectInspector() : null; childrenWriters[i] = createTreeWriter( - fields.get(i).getFieldObjectInspector(), writer, true); + childOI, children.get(i), writer, + true); } recordPosition(rowIndexPosition); } @@ -1770,15 +1756,16 @@ public class WriterImpl implements Writer, MemoryManager.Callback { ListTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); - ListObjectInspector listObjectInspector = (ListObjectInspector) inspector; + ObjectInspector childOI = + ((ListObjectInspector) inspector).getListElementObjectInspector(); childrenWriters = new TreeWriter[1]; childrenWriters[0] = - createTreeWriter(listObjectInspector.getListElementObjectInspector(), - writer, true); + createTreeWriter(childOI, schema.getChildren().get(0), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); @@ -1834,16 +1821,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback { MapTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); MapObjectInspector insp = (MapObjectInspector) inspector; childrenWriters = new TreeWriter[2]; + List<TypeDescription> children = schema.getChildren(); childrenWriters[0] = - createTreeWriter(insp.getMapKeyObjectInspector(), writer, true); + createTreeWriter(insp.getMapKeyObjectInspector(), children.get(0), + writer, true); childrenWriters[1] = - createTreeWriter(insp.getMapValueObjectInspector(), writer, true); + createTreeWriter(insp.getMapValueObjectInspector(), children.get(1), + writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); @@ -1901,14 +1892,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback { UnionTreeWriter(int columnId, ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, inspector, schema, writer, nullable); UnionObjectInspector insp = (UnionObjectInspector) inspector; List<ObjectInspector> choices = insp.getObjectInspectors(); - childrenWriters = new TreeWriter[choices.size()]; + List<TypeDescription> children = schema.getChildren(); + childrenWriters = new TreeWriter[children.size()]; for(int i=0; i < childrenWriters.length; ++i) { - childrenWriters[i] = createTreeWriter(choices.get(i), writer, true); + childrenWriters[i] = createTreeWriter(choices.get(i), + children.get(i), writer, true); } tags = new RunLengthByteWriter(writer.createStream(columnId, @@ -1949,168 +1943,151 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } private static TreeWriter createTreeWriter(ObjectInspector inspector, + TypeDescription schema, StreamFactory streamFactory, boolean nullable) throws IOException { - switch (inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { - case BOOLEAN: - return new BooleanTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case BYTE: - return new ByteTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case SHORT: - case INT: - case LONG: - return new IntegerTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case FLOAT: - return new FloatTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DOUBLE: - return new DoubleTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case STRING: - return new StringTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case CHAR: - return new CharTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case VARCHAR: - return new VarcharTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case BINARY: - return new BinaryTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case TIMESTAMP: - return new TimestampTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DATE: - return new DateTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DECIMAL: - return new DecimalTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - default: - throw new IllegalArgumentException("Bad primitive category " + - ((PrimitiveObjectInspector) inspector).getPrimitiveCategory()); - } + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case BYTE: + return new ByteTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case SHORT: + case INT: + case LONG: + return new IntegerTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case FLOAT: + return new FloatTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case DOUBLE: + return new DoubleTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case STRING: + return new StringTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case CHAR: + return new CharTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case VARCHAR: + return new VarcharTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case BINARY: + return new BinaryTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case TIMESTAMP: + return new TimestampTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case DATE: + return new DateTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); + case DECIMAL: + return new DecimalTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); case STRUCT: - return new StructTreeWriter(streamFactory.getNextColumnId(), inspector, - streamFactory, nullable); + return new StructTreeWriter(streamFactory.getNextColumnId(), + inspector, schema, streamFactory, nullable); case MAP: return new MapTreeWriter(streamFactory.getNextColumnId(), inspector, - streamFactory, nullable); + schema, streamFactory, nullable); case LIST: return new ListTreeWriter(streamFactory.getNextColumnId(), inspector, - streamFactory, nullable); + schema, streamFactory, nullable); case UNION: return new UnionTreeWriter(streamFactory.getNextColumnId(), inspector, - streamFactory, nullable); + schema, streamFactory, nullable); default: throw new IllegalArgumentException("Bad category: " + - inspector.getCategory()); + schema.getCategory()); } } private static void writeTypes(OrcProto.Footer.Builder builder, - TreeWriter treeWriter) { + TypeDescription schema) { OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); - switch (treeWriter.inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) treeWriter.inspector). - getPrimitiveCategory()) { - case BOOLEAN: - type.setKind(OrcProto.Type.Kind.BOOLEAN); - break; - case BYTE: - type.setKind(OrcProto.Type.Kind.BYTE); - break; - case SHORT: - type.setKind(OrcProto.Type.Kind.SHORT); - break; - case INT: - type.setKind(OrcProto.Type.Kind.INT); - break; - case LONG: - type.setKind(OrcProto.Type.Kind.LONG); - break; - case FLOAT: - type.setKind(OrcProto.Type.Kind.FLOAT); - break; - case DOUBLE: - type.setKind(OrcProto.Type.Kind.DOUBLE); - break; - case STRING: - type.setKind(OrcProto.Type.Kind.STRING); - break; - case CHAR: - // The char length needs to be written to file and should be available - // from the object inspector - CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); - type.setKind(Type.Kind.CHAR); - type.setMaximumLength(charTypeInfo.getLength()); - break; - case VARCHAR: - // The varchar length needs to be written to file and should be available - // from the object inspector - VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); - type.setKind(Type.Kind.VARCHAR); - type.setMaximumLength(typeInfo.getLength()); - break; - case BINARY: - type.setKind(OrcProto.Type.Kind.BINARY); - break; - case TIMESTAMP: - type.setKind(OrcProto.Type.Kind.TIMESTAMP); - break; - case DATE: - type.setKind(OrcProto.Type.Kind.DATE); - break; - case DECIMAL: - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)((PrimitiveObjectInspector)treeWriter.inspector).getTypeInfo(); - type.setKind(OrcProto.Type.Kind.DECIMAL); - type.setPrecision(decTypeInfo.precision()); - type.setScale(decTypeInfo.scale()); - break; - default: - throw new IllegalArgumentException("Unknown primitive category: " + - ((PrimitiveObjectInspector) treeWriter.inspector). - getPrimitiveCategory()); - } + List<TypeDescription> children = schema.getChildren(); + switch (schema.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case VARCHAR: + type.setKind(Type.Kind.VARCHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(schema.getPrecision()); + type.setScale(schema.getScale()); break; case LIST: type.setKind(OrcProto.Type.Kind.LIST); - type.addSubtypes(treeWriter.childrenWriters[0].id); + type.addSubtypes(children.get(0).getId()); break; case MAP: type.setKind(OrcProto.Type.Kind.MAP); - type.addSubtypes(treeWriter.childrenWriters[0].id); - type.addSubtypes(treeWriter.childrenWriters[1].id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } break; case STRUCT: type.setKind(OrcProto.Type.Kind.STRUCT); - for(TreeWriter child: treeWriter.childrenWriters) { - type.addSubtypes(child.id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); } - for(StructField field: ((StructTreeWriter) treeWriter).fields) { - type.addFieldNames(field.getFieldName()); + for(String field: schema.getFieldNames()) { + type.addFieldNames(field); } break; case UNION: type.setKind(OrcProto.Type.Kind.UNION); - for(TreeWriter child: treeWriter.childrenWriters) { - type.addSubtypes(child.id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); } break; default: throw new IllegalArgumentException("Unknown category: " + - treeWriter.inspector.getCategory()); + schema.getCategory()); } builder.addTypes(type); - for(TreeWriter child: treeWriter.childrenWriters) { - writeTypes(builder, child); + if (children != null) { + for(TypeDescription child: children) { + writeTypes(builder, child); + } } } @@ -2243,73 +2220,58 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } private long computeRawDataSize() { - long result = 0; - for (TreeWriter child : treeWriter.getChildrenWriters()) { - result += getRawDataSizeFromInspectors(child, child.inspector); - } - return result; + return getRawDataSize(treeWriter, schema); } - private long getRawDataSizeFromInspectors(TreeWriter child, ObjectInspector oi) { + private long getRawDataSize(TreeWriter child, + TypeDescription schema) { long total = 0; - switch (oi.getCategory()) { - case PRIMITIVE: - total += getRawDataSizeFromPrimitives(child, oi); - break; - case LIST: - case MAP: - case UNION: - case STRUCT: - for (TreeWriter tw : child.childrenWriters) { - total += getRawDataSizeFromInspectors(tw, tw.inspector); - } - break; - default: - LOG.debug("Unknown object inspector category."); - break; - } - return total; - } - - private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector oi) { - long result = 0; long numVals = child.fileStatistics.getNumberOfValues(); - switch (((PrimitiveObjectInspector) oi).getPrimitiveCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case FLOAT: - return numVals * JavaDataModel.get().primitive1(); - case LONG: - case DOUBLE: - return numVals * JavaDataModel.get().primitive2(); - case STRING: - case VARCHAR: - case CHAR: - // ORC strings are converted to java Strings. so use JavaDataModel to - // compute the overall size of strings - child = (StringTreeWriter) child; - StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics; - numVals = numVals == 0 ? 1 : numVals; - int avgStringLen = (int) (scs.getSum() / numVals); - return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen); - case DECIMAL: - return numVals * JavaDataModel.get().lengthOfDecimal(); - case DATE: - return numVals * JavaDataModel.get().lengthOfDate(); - case BINARY: - // get total length of binary blob - BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics; - return bcs.getSum(); - case TIMESTAMP: - return numVals * JavaDataModel.get().lengthOfTimestamp(); - default: - LOG.debug("Unknown primitive category."); - break; + switch (schema.getCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case FLOAT: + return numVals * JavaDataModel.get().primitive1(); + case LONG: + case DOUBLE: + return numVals * JavaDataModel.get().primitive2(); + case STRING: + case VARCHAR: + case CHAR: + // ORC strings are converted to java Strings. so use JavaDataModel to + // compute the overall size of strings + StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics; + numVals = numVals == 0 ? 1 : numVals; + int avgStringLen = (int) (scs.getSum() / numVals); + return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen); + case DECIMAL: + return numVals * JavaDataModel.get().lengthOfDecimal(); + case DATE: + return numVals * JavaDataModel.get().lengthOfDate(); + case BINARY: + // get total length of binary blob + BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics; + return bcs.getSum(); + case TIMESTAMP: + return numVals * JavaDataModel.get().lengthOfTimestamp(); + case LIST: + case MAP: + case UNION: + case STRUCT: { + TreeWriter[] childWriters = child.getChildrenWriters(); + List<TypeDescription> childTypes = schema.getChildren(); + for (int i=0; i < childWriters.length; ++i) { + total += getRawDataSize(childWriters[i], childTypes.get(i)); + } + break; + } + default: + LOG.debug("Unknown object inspector category."); + break; } - - return result; + return total; } private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { @@ -2356,7 +2318,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // populate raw data size rawDataSize = computeRawDataSize(); // serialize the types - writeTypes(builder, treeWriter); + writeTypes(builder, schema); // add the stripe information for(OrcProto.StripeInformation stripe: stripes) { builder.addStripes(stripe); @@ -2385,7 +2347,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { .setMagic(OrcFile.MAGIC) .addVersion(version.getMajor()) .addVersion(version.getMinor()) - .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId()); + .setWriterVersion(OrcFile.WriterVersion.HIVE_4243.getId()); if (compress != CompressionKind.NONE) { builder.setCompressionBlockSize(bufferSize); } @@ -2410,6 +2372,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + public TypeDescription getSchema() { + return schema; + } + + @Override public void addUserMetadata(String name, ByteBuffer value) { userMetadata.put(name, ByteString.copyFrom(value)); } @@ -2493,12 +2460,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { getStream(); long start = rawWriter.getPos(); - long stripeLen = length; long availBlockSpace = blockSize - (start % blockSize); // see if stripe can fit in the current hdfs block, else pad the remaining // space in the block - if (stripeLen < blockSize && stripeLen > availBlockSpace && + if (length < blockSize && length > availBlockSpace && addBlockPadding) { byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; LOG.info(String.format("Padding ORC by %d bytes while merging..", http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ---------------------------------------------------------------------- diff --git a/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto b/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto index 3b7a9b3..acadef9 100644 --- a/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto +++ b/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto @@ -213,6 +213,7 @@ message PostScript { // Version of the writer: // 0 (or missing) = original // 1 = HIVE-8732 fixed + // 2 = HIVE-4243 fixed optional uint32 writerVersion = 6; // Leave this last in the record optional string magic = 8000; http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java index 4d30377..4e3bc90 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java @@ -48,11 +48,10 @@ public class TestColumnStatistics { @Test public void testLongMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaIntObjectInspector; + TypeDescription schema = TypeDescription.createInt(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateInteger(10); stats1.updateInteger(10); stats2.updateInteger(1); @@ -71,11 +70,10 @@ public class TestColumnStatistics { @Test public void testDoubleMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + TypeDescription schema = TypeDescription.createDouble(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateDouble(10.0); stats1.updateDouble(100.0); stats2.updateDouble(1.0); @@ -95,11 +93,10 @@ public class TestColumnStatistics { @Test public void testStringMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaStringObjectInspector; + TypeDescription schema = TypeDescription.createString(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateString(new Text("bob")); stats1.updateString(new Text("david")); stats1.updateString(new Text("charles")); @@ -119,11 +116,10 @@ public class TestColumnStatistics { @Test public void testDateMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaDateObjectInspector; + TypeDescription schema = TypeDescription.createDate(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateDate(new DateWritable(1000)); stats1.updateDate(new DateWritable(100)); stats2.updateDate(new DateWritable(10)); @@ -142,11 +138,10 @@ public class TestColumnStatistics { @Test public void testTimestampMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaTimestampObjectInspector; + TypeDescription schema = TypeDescription.createTimestamp(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateTimestamp(new Timestamp(10)); stats1.updateTimestamp(new Timestamp(100)); stats2.updateTimestamp(new Timestamp(1)); @@ -165,11 +160,11 @@ public class TestColumnStatistics { @Test public void testDecimalMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector; + TypeDescription schema = TypeDescription.createDecimal() + .withPrecision(38).withScale(16); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateDecimal(HiveDecimal.create(10)); stats1.updateDecimal(HiveDecimal.create(100)); stats2.updateDecimal(HiveDecimal.create(1));
