Repository: hive Updated Branches: refs/heads/master fffbec065 -> 61e21d6c3
HIVE-18817 - ArrayIndexOutOfBounds exception during read of ACID table. (Eugene Koifman, Jason Dere, Prasanth Jayachandran, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/61e21d6c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/61e21d6c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/61e21d6c Branch: refs/heads/master Commit: 61e21d6c3039087b83609577325175b1f603b50f Parents: fffbec0 Author: Eugene Koifman <ekoif...@apache.org> Authored: Fri Mar 2 09:11:01 2018 -0800 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Fri Mar 2 09:11:01 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 17 +++ .../hive/ql/io/orc/TestInputOutputFormat.java | 123 ++++++++++++++++++- .../results/clientpositive/acid_nullscan.q.out | 8 +- .../clientpositive/autoColumnStats_4.q.out | 4 +- .../llap/acid_bucket_pruning.q.out | 4 +- .../test/results/clientpositive/row__id.q.out | 18 +-- 6 files changed, 155 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/61e21d6c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 970af0e..d850062 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -47,6 +47,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.orc.OrcConf; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.WriterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -559,6 +560,17 @@ public class OrcRecordUpdater implements RecordUpdater { int lastBucket; long lastRowId; AcidStats acidStats = new AcidStats(); + /** + * {@link #preStripeWrite(OrcFile.WriterContext)} is normally called by the + * {@link org.apache.orc.MemoryManager} except on close(). + * {@link org.apache.orc.impl.WriterImpl#close()} calls preFooterWrite() before it calls + * {@link WriterImpl#flushStripe()} which causes the {@link #ACID_KEY_INDEX_NAME} index to + * have the last entry missing. It should be also fixed in ORC but that requires upgrading + * the ORC jars to have effect. + * + * This is used to decide if we need to make preStripeWrite() call here. + */ + private long numKeysCurrentStripe = 0; KeyIndexBuilder(String name) { this.builderName = name; @@ -572,11 +584,15 @@ public class OrcRecordUpdater implements RecordUpdater { lastKey.append(','); lastKey.append(lastRowId); lastKey.append(';'); + numKeysCurrentStripe = 0; } @Override public void preFooterWrite(OrcFile.WriterContext context ) throws IOException { + if(numKeysCurrentStripe > 0) { + preStripeWrite(context); + } context.getWriter().addUserMetadata(ACID_KEY_INDEX_NAME, UTF8.encode(lastKey.toString())); context.getWriter().addUserMetadata(OrcAcidUtils.ACID_STATS, @@ -600,6 +616,7 @@ public class OrcRecordUpdater implements RecordUpdater { lastTransaction = transaction; lastBucket = bucket; lastRowId = rowId; + numKeysCurrentStripe++; } } http://git-wip-us.apache.org/repos/asf/hive/blob/61e21d6c/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 0ac29fa..073b072 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -48,6 +48,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -115,7 +117,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.orc.OrcConf; import org.apache.orc.OrcProto; +import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -2473,14 +2477,14 @@ public class TestInputOutputFormat { assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(663, split.getLength()); + assertEquals(679, split.getLength()); split = (HiveInputFormat.HiveInputSplit) splits[1]; assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", split.inputFormatClassName()); assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(690, split.getLength()); + assertEquals(705, split.getLength()); CombineHiveInputFormat.CombineHiveInputSplit combineSplit = (CombineHiveInputFormat.CombineHiveInputSplit) splits[2]; assertEquals(BUCKETS, combineSplit.getNumPaths()); @@ -3951,4 +3955,119 @@ public class TestInputOutputFormat { assertEquals(1000, record); reader.close(); } + + @Test + public void testAcidReadPastLastStripeOffset() throws Exception { + Path baseDir = new Path(workDir, "base_00100"); + testFilePath = new Path(baseDir, "bucket_00000"); + fs.mkdirs(baseDir); + fs.delete(testFilePath, true); + TypeDescription fileSchema = + TypeDescription.fromString("struct<operation:int," + + "originalTransaction:bigint,bucket:int,rowId:bigint," + + "currentTransaction:bigint," + + "row:struct<a:int,b:struct<c:int>,d:string>>"); + + OrcRecordUpdater.KeyIndexBuilder indexBuilder = new OrcRecordUpdater.KeyIndexBuilder("test"); + OrcFile.WriterOptions options = OrcFile.writerOptions(conf) + .fileSystem(fs) + .setSchema(fileSchema) + .compress(org.apache.orc.CompressionKind.NONE) + .callback(indexBuilder) + .stripeSize(128); + // Create ORC file with small stripe size so we can write multiple stripes. + Writer writer = OrcFile.createWriter(testFilePath, options); + VectorizedRowBatch batch = fileSchema.createRowBatch(1000); + batch.size = 1000; + StructColumnVector scv = (StructColumnVector)batch.cols[5]; + // operation + batch.cols[0].isRepeating = true; + ((LongColumnVector) batch.cols[0]).vector[0] = OrcRecordUpdater.INSERT_OPERATION; + // original transaction + batch.cols[1].isRepeating = true; + ((LongColumnVector) batch.cols[1]).vector[0] = 1; + // bucket + batch.cols[2].isRepeating = true; + ((LongColumnVector) batch.cols[2]).vector[0] = BucketCodec.V1.encode(new AcidOutputFormat + .Options(conf).bucket(0).statementId(0)); + // current transaction + batch.cols[4].isRepeating = true; + ((LongColumnVector) batch.cols[4]).vector[0] = 1; + + LongColumnVector lcv = (LongColumnVector) + ((StructColumnVector) scv.fields[1]).fields[0]; + for(int r=0; r < 1000; r++) { + // row id + ((LongColumnVector) batch.cols[3]).vector[r] = r; + // a + ((LongColumnVector) scv.fields[0]).vector[r] = r * 42; + // b.c + lcv.vector[r] = r * 10001; + // d + ((BytesColumnVector) scv.fields[2]).setVal(r, + Integer.toHexString(r).getBytes(StandardCharsets.UTF_8)); + indexBuilder.addKey(OrcRecordUpdater.INSERT_OPERATION, + 1, (int)(((LongColumnVector) batch.cols[2]).vector[0]), r); + } + + // Minimum 5000 rows per stripe. + for (int idx = 0; idx < 8; ++idx) { + writer.addRowBatch(batch); + // bucket + batch.cols[2].isRepeating = true; + ((LongColumnVector) batch.cols[2]).vector[0] = BucketCodec.V1.encode(new AcidOutputFormat + .Options(conf).bucket(0).statementId(idx + 1)); + for(long row_id : ((LongColumnVector) batch.cols[3]).vector) { + indexBuilder.addKey(OrcRecordUpdater.INSERT_OPERATION, + 1, (int)(((LongColumnVector) batch.cols[2]).vector[0]), row_id); + } + } + writer.close(); + long fileLength = fs.getFileStatus(testFilePath).getLen(); + + // Find the last stripe. + Reader orcReader = OrcFile.createReader(fs, testFilePath); + List<StripeInformation> stripes = orcReader.getStripes(); + StripeInformation lastStripe = stripes.get(stripes.size() - 1); + long lastStripeOffset = lastStripe.getOffset(); + long lastStripeLength = lastStripe.getLength(); + + RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(orcReader); + Assert.assertEquals("Index length doesn't match number of stripes", + stripes.size(), keyIndex.length); + Assert.assertEquals("1st Index entry mismatch", + new RecordIdentifier(1, 536870916, 999), keyIndex[0]); + Assert.assertEquals("2nd Index entry mismatch", + new RecordIdentifier(1, 536870920, 999), keyIndex[1]); + + // test with same schema with include + conf.set(ValidTxnList.VALID_TXNS_KEY, "100:99:"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "a,b,d"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "int,struct<c:int>,string"); + conf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false"); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2"); + + LOG.info("Last stripe " + stripes.size() + + ", offset " + lastStripeOffset + ", length " + lastStripeLength); + // Specify an OrcSplit that starts beyond the offset of the last stripe. + OrcSplit split = new OrcSplit(testFilePath, null, lastStripeOffset + 1, lastStripeLength, + new String[0], null, false, true, + new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength, workDir); + OrcInputFormat inputFormat = new OrcInputFormat(); + AcidInputFormat.RowReader<OrcStruct> reader = inputFormat.getReader(split, + new AcidInputFormat.Options(conf)); + + int record = 0; + RecordIdentifier id = reader.createKey(); + OrcStruct struct = reader.createValue(); + // Iterate through any records. + // Because our read offset was past the stripe offset, the rows from the last stripe will + // not be read. Thus 0 records. + while (reader.next(id, struct)) { + record += 1; + } + assertEquals(0, record); + + reader.close(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/61e21d6c/ql/src/test/results/clientpositive/acid_nullscan.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/acid_nullscan.q.out b/ql/src/test/results/clientpositive/acid_nullscan.q.out index d5070d3..669fa3f 100644 --- a/ql/src/test/results/clientpositive/acid_nullscan.q.out +++ b/ql/src/test/results/clientpositive/acid_nullscan.q.out @@ -42,12 +42,12 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid_vectorized - Statistics: Num rows: 1 Data size: 25030 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 25470 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: false (type: boolean) - Statistics: Num rows: 1 Data size: 25030 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 25470 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: sum(a) mode: hash @@ -83,7 +83,7 @@ STAGE PLANS: serialization.ddl struct acid_vectorized { i32 a, string b} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.NullStructSerDe - totalSize 2503 + totalSize 2547 transactional true transactional_properties default #### A masked pattern was here #### @@ -106,7 +106,7 @@ STAGE PLANS: serialization.ddl struct acid_vectorized { i32 a, string b} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 2503 + totalSize 2547 transactional true transactional_properties default #### A masked pattern was here #### http://git-wip-us.apache.org/repos/asf/hive/blob/61e21d6c/ql/src/test/results/clientpositive/autoColumnStats_4.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out index fea8acb..1f4c0ad 100644 --- a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -197,7 +197,7 @@ Table Parameters: numFiles 2 numRows 0 rawDataSize 0 - totalSize 1834 + totalSize 1862 transactional true transactional_properties default #### A masked pattern was here #### @@ -241,7 +241,7 @@ Table Parameters: numFiles 4 numRows 0 rawDataSize 0 - totalSize 2955 + totalSize 3012 transactional true transactional_properties default #### A masked pattern was here #### http://git-wip-us.apache.org/repos/asf/hive/blob/61e21d6c/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out b/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out index feca9c7..1abd3a2 100644 --- a/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out +++ b/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out @@ -103,7 +103,7 @@ STAGE PLANS: serialization.ddl struct acidtbldefault { i32 a} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 32918 + totalSize 33194 transactional true transactional_properties default #### A masked pattern was here #### @@ -127,7 +127,7 @@ STAGE PLANS: serialization.ddl struct acidtbldefault { i32 a} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 32918 + totalSize 33194 transactional true transactional_properties default #### A masked pattern was here #### http://git-wip-us.apache.org/repos/asf/hive/blob/61e21d6c/ql/src/test/results/clientpositive/row__id.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/row__id.q.out b/ql/src/test/results/clientpositive/row__id.q.out index bf10c6e..fb5df5c 100644 --- a/ql/src/test/results/clientpositive/row__id.q.out +++ b/ql/src/test/results/clientpositive/row__id.q.out @@ -62,23 +62,23 @@ STAGE PLANS: Map Operator Tree: TableScan alias: hello_acid - Statistics: Num rows: 75 Data size: 18890 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 77 Data size: 19370 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: ROW__ID.transactionid (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 75 Data size: 18890 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 77 Data size: 19370 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: bigint) sort order: + - Statistics: Num rows: 75 Data size: 18890 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 77 Data size: 19370 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 75 Data size: 18890 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 77 Data size: 19370 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 75 Data size: 18890 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 77 Data size: 19370 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -123,17 +123,17 @@ STAGE PLANS: Map Operator Tree: TableScan alias: hello_acid - Statistics: Num rows: 75 Data size: 18890 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 77 Data size: 19370 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (ROW__ID.transactionid = 3) (type: boolean) - Statistics: Num rows: 37 Data size: 9319 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 38 Data size: 9559 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: ROW__ID.transactionid (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 37 Data size: 9319 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 38 Data size: 9559 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 37 Data size: 9319 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 38 Data size: 9559 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat