Repository: hive Updated Branches: refs/heads/branch-1 d36207ec9 -> 743585890
HIVE-12887 Handle ORC schema on read with fewer columns than file schema (after Schema Evolution changes) (Matt McCline, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/74358589 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/74358589 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/74358589 Branch: refs/heads/branch-1 Commit: 743585890950d8360b84ae044d50cc1fb8151a61 Parents: d36207e Author: Matt McCline <mmccl...@hortonworks.com> Authored: Wed Jan 20 14:25:21 2016 -0800 Committer: Matt McCline <mmccl...@hortonworks.com> Committed: Wed Jan 20 14:25:21 2016 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/io/orc/OrcUtils.java | 49 ++++++---- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 8 +- .../hive/ql/io/orc/TreeReaderFactory.java | 29 +++--- .../queries/clientpositive/orc_remove_cols.q | 17 ++++ .../clientpositive/orc_remove_cols.q.out | 94 ++++++++++++++++++++ 5 files changed, 164 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/74358589/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index ad4a9e8..84fd3c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; @@ -721,15 +722,13 @@ public class OrcUtils { } if (haveSchemaEvolutionProperties) { - LOG.info("Using schema evolution configuration variables " + - "schema.evolution.columns " + - schemaEvolutionColumnNames.toString() + - " / schema.evolution.columns.types " + - schemaEvolutionTypeDescrs.toString() + - " (isAcid " + - isAcid + - ")"); - + if (LOG.isInfoEnabled()) { + LOG.info("Using schema evolution configuration variables schema.evolution.columns " + + schemaEvolutionColumnNames.toString() + + " / schema.evolution.columns.types " + + schemaEvolutionTypeDescrs.toString() + + " (isAcid " + isAcid + ")"); + } } else { // Try regular properties; @@ -748,14 +747,30 @@ public class OrcUtils { if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { return null; } - LOG.info("Using column configuration variables " + - "columns " + - schemaEvolutionColumnNames.toString() + - " / columns.types " + - schemaEvolutionTypeDescrs.toString() + - " (isAcid " + - isAcid + - ")"); + + // Find first virtual column and clip them off. + int virtualColumnClipNum = -1; + int columnNum = 0; + for (String columnName : schemaEvolutionColumnNames) { + if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(columnName)) { + virtualColumnClipNum = columnNum; + break; + } + columnNum++; + } + if (virtualColumnClipNum != -1) { + schemaEvolutionColumnNames = + Lists.newArrayList(schemaEvolutionColumnNames.subList(0, virtualColumnClipNum)); + schemaEvolutionTypeDescrs = Lists.newArrayList(schemaEvolutionTypeDescrs.subList(0, virtualColumnClipNum)); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Using column configuration variables columns " + + schemaEvolutionColumnNames.toString() + + " / columns.types " + + schemaEvolutionTypeDescrs.toString() + + " (isAcid " + isAcid + ")"); + } } // Desired schema does not include virtual columns or partition columns. http://git-wip-us.apache.org/repos/asf/hive/blob/74358589/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 24834a5..44cac68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -168,6 +168,9 @@ class RecordReaderImpl implements RecordReader { TreeReaderSchema treeReaderSchema; if (options.getSchema() == null) { + if (LOG.isInfoEnabled()) { + LOG.info("Schema on read not provided -- using file schema " + types.toString()); + } treeReaderSchema = new TreeReaderSchema().fileTypes(types).schemaTypes(types); } else { @@ -946,7 +949,7 @@ class RecordReaderImpl implements RecordReader { // since stream kind is optional, first check if it exists if (stream.hasKind() && (StreamName.getArea(streamKind) == StreamName.Area.DATA) && - includedColumns[column]) { + (column < includedColumns.length && includedColumns[column])) { // if we aren't filtering or it is a dictionary, load it. if (includedRowGroups == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) { @@ -971,7 +974,8 @@ class RecordReaderImpl implements RecordReader { long streamOffset = 0; for (OrcProto.Stream streamDesc: streamDescriptions) { int column = streamDesc.getColumn(); - if ((includeColumn != null && !includeColumn[column]) || + if ((includeColumn != null && + (column < included.length && !includeColumn[column])) || streamDesc.hasKind() && (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) { streamOffset += streamDesc.getLength(); http://git-wip-us.apache.org/repos/asf/hive/blob/74358589/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 22f61ee..c8f9595 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -2010,7 +2010,7 @@ public class TreeReaderFactory { } protected static class StructTreeReader extends TreeReader { - private final int fileColumnCount; + private final int readColumnCount; private final int resultColumnCount; protected final TreeReader[] fields; private final String[] fieldNames; @@ -2023,30 +2023,31 @@ public class TreeReaderFactory { super(columnId); OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId); - fileColumnCount = fileStructType.getFieldNamesCount(); OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId); + readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount()); + if (columnId == treeReaderSchema.getInnerStructSubtype()) { // If there are more result columns than reader columns, we will default those additional // columns to NULL. resultColumnCount = schemaStructType.getFieldNamesCount(); } else { - resultColumnCount = fileColumnCount; + resultColumnCount = readColumnCount; } - this.fields = new TreeReader[fileColumnCount]; - this.fieldNames = new String[fileColumnCount]; + this.fields = new TreeReader[readColumnCount]; + this.fieldNames = new String[readColumnCount]; if (included == null) { - for (int i = 0; i < fileColumnCount; ++i) { + for (int i = 0; i < readColumnCount; ++i) { int subtype = schemaStructType.getSubtypes(i); this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. this.fieldNames[i] = schemaStructType.getFieldNames(i); } } else { - for (int i = 0; i < fileColumnCount; ++i) { + for (int i = 0; i < readColumnCount; ++i) { int subtype = schemaStructType.getSubtypes(i); if (subtype >= included.length) { throw new IOException("subtype " + subtype + " exceeds the included array size " + @@ -2090,13 +2091,13 @@ public class TreeReaderFactory { result.setNumFields(resultColumnCount); } } - for (int i = 0; i < fileColumnCount; ++i) { + for (int i = 0; i < readColumnCount; ++i) { if (fields[i] != null) { result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); } } - if (resultColumnCount > fileColumnCount) { - for (int i = fileColumnCount; i < resultColumnCount; ++i) { + if (resultColumnCount > readColumnCount) { + for (int i = readColumnCount; i < resultColumnCount; ++i) { // Default new treeReaderSchema evolution fields to NULL. result.setFieldValue(i, null); } @@ -2109,13 +2110,13 @@ public class TreeReaderFactory { public Object nextVector(Object previousVector, long batchSize) throws IOException { final ColumnVector[] result; if (previousVector == null) { - result = new ColumnVector[fileColumnCount]; + result = new ColumnVector[readColumnCount]; } else { result = (ColumnVector[]) previousVector; } // Read all the members of struct as column vectors - for (int i = 0; i < fileColumnCount; i++) { + for (int i = 0; i < readColumnCount; i++) { if (fields[i] != null) { if (result[i] == null) { result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); @@ -2126,8 +2127,8 @@ public class TreeReaderFactory { } // Default additional treeReaderSchema evolution fields to NULL. - if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) { - for (int i = fileColumnCount; i < vectorColumnCount; ++i) { + if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) { + for (int i = readColumnCount; i < vectorColumnCount; ++i) { ColumnVector colVector = result[i]; if (colVector != null) { colVector.isRepeating = true; http://git-wip-us.apache.org/repos/asf/hive/blob/74358589/ql/src/test/queries/clientpositive/orc_remove_cols.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_remove_cols.q b/ql/src/test/queries/clientpositive/orc_remove_cols.q new file mode 100644 index 0000000..fdae064 --- /dev/null +++ b/ql/src/test/queries/clientpositive/orc_remove_cols.q @@ -0,0 +1,17 @@ +SET hive.exec.schema.evolution=false; +set hive.fetch.task.conversion=more; +set hive.mapred.mode=nonstrict; +set hive.exec.dynamic.partition.mode=nonstrict; + +CREATE TABLE orc_partitioned(a INT, b STRING) partitioned by (ds string) STORED AS ORC; +insert into table orc_partitioned partition (ds = 'today') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10; +insert into table orc_partitioned partition (ds = 'tomorrow') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10; + +-- Use the old change the SERDE trick to avoid ORC DDL checks... and remove a column on the end. +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'; +ALTER TABLE orc_partitioned REPLACE COLUMNS (cint int); +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'; + +SELECT * FROM orc_partitioned WHERE ds = 'today'; +SELECT * FROM orc_partitioned WHERE ds = 'tomorrow'; + http://git-wip-us.apache.org/repos/asf/hive/blob/74358589/ql/src/test/results/clientpositive/orc_remove_cols.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/orc_remove_cols.q.out b/ql/src/test/results/clientpositive/orc_remove_cols.q.out new file mode 100644 index 0000000..b449b87 --- /dev/null +++ b/ql/src/test/results/clientpositive/orc_remove_cols.q.out @@ -0,0 +1,94 @@ +PREHOOK: query: CREATE TABLE orc_partitioned(a INT, b STRING) partitioned by (ds string) STORED AS ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: CREATE TABLE orc_partitioned(a INT, b STRING) partitioned by (ds string) STORED AS ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: insert into table orc_partitioned partition (ds = 'today') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_partitioned@ds=today +POSTHOOK: query: insert into table orc_partitioned partition (ds = 'today') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_partitioned@ds=today +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=today).a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=today).b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: insert into table orc_partitioned partition (ds = 'tomorrow') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_partitioned@ds=tomorrow +POSTHOOK: query: insert into table orc_partitioned partition (ds = 'tomorrow') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_partitioned@ds=tomorrow +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=tomorrow).a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=tomorrow).b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: -- Use the old change the SERDE trick to avoid ORC DDL checks... and remove a column on the end. +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' +PREHOOK: type: ALTERTABLE_SERIALIZER +PREHOOK: Input: default@orc_partitioned +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: -- Use the old change the SERDE trick to avoid ORC DDL checks... and remove a column on the end. +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' +POSTHOOK: type: ALTERTABLE_SERIALIZER +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: ALTER TABLE orc_partitioned REPLACE COLUMNS (cint int) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@orc_partitioned +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: ALTER TABLE orc_partitioned REPLACE COLUMNS (cint int) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' +PREHOOK: type: ALTERTABLE_SERIALIZER +PREHOOK: Input: default@orc_partitioned +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' +POSTHOOK: type: ALTERTABLE_SERIALIZER +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'today' +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_partitioned +PREHOOK: Input: default@orc_partitioned@ds=today +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'today' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Input: default@orc_partitioned@ds=today +#### A masked pattern was here #### +-1073279343 today +-1073051226 today +-1072910839 today +-1072081801 today +-1072076362 today +-1071480828 today +-1071363017 today +-1070883071 today +-1070551679 today +-1069736047 today +PREHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'tomorrow' +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_partitioned +PREHOOK: Input: default@orc_partitioned@ds=tomorrow +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'tomorrow' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Input: default@orc_partitioned@ds=tomorrow +#### A masked pattern was here #### +-1073279343 tomorrow +-1073051226 tomorrow +-1072910839 tomorrow +-1072081801 tomorrow +-1072076362 tomorrow +-1071480828 tomorrow +-1071363017 tomorrow +-1070883071 tomorrow +-1070551679 tomorrow +-1069736047 tomorrow