Repository: hive Updated Branches: refs/heads/branch-2.0 8b3e7aa51 -> a3502d05c
HIVE-12887 Handle ORC schema on read with fewer columns than file schema (after Schema Evolution changes) (Matt McCline, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3502d05 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3502d05 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3502d05 Branch: refs/heads/branch-2.0 Commit: a3502d05ce4349cfb8836c37311018d49872ca93 Parents: 8b3e7aa Author: Matt McCline <mmccl...@hortonworks.com> Authored: Wed Jan 20 14:08:41 2016 -0800 Committer: Matt McCline <mmccl...@hortonworks.com> Committed: Tue Apr 26 16:26:58 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 8 +- .../hive/ql/io/orc/TreeReaderFactory.java | 29 +++--- .../queries/clientpositive/orc_remove_cols.q | 17 ++++ .../clientpositive/orc_remove_cols.q.out | 94 ++++++++++++++++++++ 4 files changed, 132 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a3502d05/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 567899a..acfe1a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -242,6 +242,9 @@ public class RecordReaderImpl implements RecordReader { this.types = builder.types; TreeReaderFactory.TreeReaderSchema treeReaderSchema; if (options.getSchema() == null) { + if (LOG.isInfoEnabled()) { + LOG.info("Schema on read not provided -- using file schema " + types.toString()); + } treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types); } else { @@ -999,7 +1002,7 @@ public class RecordReaderImpl implements RecordReader { // since stream kind is optional, first check if it exists if (stream.hasKind() && (StreamName.getArea(streamKind) == StreamName.Area.DATA) && - includedColumns[column]) { + (column < includedColumns.length && includedColumns[column])) { // if we aren't filtering or it is a dictionary, load it. if (includedRowGroups == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) { @@ -1024,7 +1027,8 @@ public class RecordReaderImpl implements RecordReader { long streamOffset = 0; for (OrcProto.Stream streamDesc : streamDescriptions) { int column = streamDesc.getColumn(); - if ((includeColumn != null && !includeColumn[column]) || + if ((includeColumn != null && + (column < included.length && !includeColumn[column])) || streamDesc.hasKind() && (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) { streamOffset += streamDesc.getLength(); http://git-wip-us.apache.org/repos/asf/hive/blob/a3502d05/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index d8a134b..8bb32ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -2050,7 +2050,7 @@ public class TreeReaderFactory { } protected static class StructTreeReader extends TreeReader { - private final int fileColumnCount; + private final int readColumnCount; private final int resultColumnCount; protected final TreeReader[] fields; private final String[] fieldNames; @@ -2063,30 +2063,31 @@ public class TreeReaderFactory { super(columnId); OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId); - fileColumnCount = fileStructType.getFieldNamesCount(); OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId); + readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount()); + if (columnId == treeReaderSchema.getInnerStructSubtype()) { // If there are more result columns than reader columns, we will default those additional // columns to NULL. resultColumnCount = schemaStructType.getFieldNamesCount(); } else { - resultColumnCount = fileColumnCount; + resultColumnCount = readColumnCount; } - this.fields = new TreeReader[fileColumnCount]; - this.fieldNames = new String[fileColumnCount]; + this.fields = new TreeReader[readColumnCount]; + this.fieldNames = new String[readColumnCount]; if (included == null) { - for (int i = 0; i < fileColumnCount; ++i) { + for (int i = 0; i < readColumnCount; ++i) { int subtype = schemaStructType.getSubtypes(i); this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. this.fieldNames[i] = schemaStructType.getFieldNames(i); } } else { - for (int i = 0; i < fileColumnCount; ++i) { + for (int i = 0; i < readColumnCount; ++i) { int subtype = schemaStructType.getSubtypes(i); if (subtype >= included.length) { throw new IOException("subtype " + subtype + " exceeds the included array size " + @@ -2130,13 +2131,13 @@ public class TreeReaderFactory { result.setNumFields(resultColumnCount); } } - for (int i = 0; i < fileColumnCount; ++i) { + for (int i = 0; i < readColumnCount; ++i) { if (fields[i] != null) { result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); } } - if (resultColumnCount > fileColumnCount) { - for (int i = fileColumnCount; i < resultColumnCount; ++i) { + if (resultColumnCount > readColumnCount) { + for (int i = readColumnCount; i < resultColumnCount; ++i) { // Default new treeReaderSchema evolution fields to NULL. result.setFieldValue(i, null); } @@ -2149,13 +2150,13 @@ public class TreeReaderFactory { public Object nextVector(Object previousVector, final int batchSize) throws IOException { final ColumnVector[] result; if (previousVector == null) { - result = new ColumnVector[fileColumnCount]; + result = new ColumnVector[readColumnCount]; } else { result = (ColumnVector[]) previousVector; } // Read all the members of struct as column vectors - for (int i = 0; i < fileColumnCount; i++) { + for (int i = 0; i < readColumnCount; i++) { if (fields[i] != null) { if (result[i] == null) { result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); @@ -2166,8 +2167,8 @@ public class TreeReaderFactory { } // Default additional treeReaderSchema evolution fields to NULL. - if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) { - for (int i = fileColumnCount; i < vectorColumnCount; ++i) { + if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) { + for (int i = readColumnCount; i < vectorColumnCount; ++i) { ColumnVector colVector = result[i]; if (colVector != null) { colVector.isRepeating = true; http://git-wip-us.apache.org/repos/asf/hive/blob/a3502d05/ql/src/test/queries/clientpositive/orc_remove_cols.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_remove_cols.q b/ql/src/test/queries/clientpositive/orc_remove_cols.q new file mode 100644 index 0000000..fdae064 --- /dev/null +++ b/ql/src/test/queries/clientpositive/orc_remove_cols.q @@ -0,0 +1,17 @@ +SET hive.exec.schema.evolution=false; +set hive.fetch.task.conversion=more; +set hive.mapred.mode=nonstrict; +set hive.exec.dynamic.partition.mode=nonstrict; + +CREATE TABLE orc_partitioned(a INT, b STRING) partitioned by (ds string) STORED AS ORC; +insert into table orc_partitioned partition (ds = 'today') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10; +insert into table orc_partitioned partition (ds = 'tomorrow') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10; + +-- Use the old change the SERDE trick to avoid ORC DDL checks... and remove a column on the end. +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'; +ALTER TABLE orc_partitioned REPLACE COLUMNS (cint int); +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'; + +SELECT * FROM orc_partitioned WHERE ds = 'today'; +SELECT * FROM orc_partitioned WHERE ds = 'tomorrow'; + http://git-wip-us.apache.org/repos/asf/hive/blob/a3502d05/ql/src/test/results/clientpositive/orc_remove_cols.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/orc_remove_cols.q.out b/ql/src/test/results/clientpositive/orc_remove_cols.q.out new file mode 100644 index 0000000..b449b87 --- /dev/null +++ b/ql/src/test/results/clientpositive/orc_remove_cols.q.out @@ -0,0 +1,94 @@ +PREHOOK: query: CREATE TABLE orc_partitioned(a INT, b STRING) partitioned by (ds string) STORED AS ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: CREATE TABLE orc_partitioned(a INT, b STRING) partitioned by (ds string) STORED AS ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: insert into table orc_partitioned partition (ds = 'today') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_partitioned@ds=today +POSTHOOK: query: insert into table orc_partitioned partition (ds = 'today') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_partitioned@ds=today +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=today).a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=today).b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: insert into table orc_partitioned partition (ds = 'tomorrow') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_partitioned@ds=tomorrow +POSTHOOK: query: insert into table orc_partitioned partition (ds = 'tomorrow') select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_partitioned@ds=tomorrow +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=tomorrow).a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_partitioned PARTITION(ds=tomorrow).b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: -- Use the old change the SERDE trick to avoid ORC DDL checks... and remove a column on the end. +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' +PREHOOK: type: ALTERTABLE_SERIALIZER +PREHOOK: Input: default@orc_partitioned +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: -- Use the old change the SERDE trick to avoid ORC DDL checks... and remove a column on the end. +ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' +POSTHOOK: type: ALTERTABLE_SERIALIZER +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: ALTER TABLE orc_partitioned REPLACE COLUMNS (cint int) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@orc_partitioned +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: ALTER TABLE orc_partitioned REPLACE COLUMNS (cint int) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' +PREHOOK: type: ALTERTABLE_SERIALIZER +PREHOOK: Input: default@orc_partitioned +PREHOOK: Output: default@orc_partitioned +POSTHOOK: query: ALTER TABLE orc_partitioned SET SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' +POSTHOOK: type: ALTERTABLE_SERIALIZER +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Output: default@orc_partitioned +PREHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'today' +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_partitioned +PREHOOK: Input: default@orc_partitioned@ds=today +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'today' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Input: default@orc_partitioned@ds=today +#### A masked pattern was here #### +-1073279343 today +-1073051226 today +-1072910839 today +-1072081801 today +-1072076362 today +-1071480828 today +-1071363017 today +-1070883071 today +-1070551679 today +-1069736047 today +PREHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'tomorrow' +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_partitioned +PREHOOK: Input: default@orc_partitioned@ds=tomorrow +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM orc_partitioned WHERE ds = 'tomorrow' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_partitioned +POSTHOOK: Input: default@orc_partitioned@ds=tomorrow +#### A masked pattern was here #### +-1073279343 tomorrow +-1073051226 tomorrow +-1072910839 tomorrow +-1072081801 tomorrow +-1072076362 tomorrow +-1071480828 tomorrow +-1071363017 tomorrow +-1070883071 tomorrow +-1070551679 tomorrow +-1069736047 tomorrow