Repository: hive Updated Branches: refs/heads/master 645458532 -> 1aec6a394
OrcRawRecordMerger doesn't work for more than one file in non vectorized case (Sergey Shelukhin, reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1aec6a39 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1aec6a39 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1aec6a39 Branch: refs/heads/master Commit: 1aec6a39424942f5dc7974413ad5ea2829985a73 Parents: 6454585 Author: sergey <ser...@apache.org> Authored: Sat Jun 9 09:31:16 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Sat Jun 9 09:31:16 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/RecordIdentifier.java | 1 + .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 6 ++-- .../hive/ql/io/orc/OrcRawRecordMerger.java | 38 ++++++++++++++------ 3 files changed, 33 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1aec6a39/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 607abfd..ea7ba53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -226,6 +226,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}"; } protected String bucketToString() { + if (bucketId == -1) return ("bucket: " + bucketId); BucketCodec codec = BucketCodec.determineVersion(bucketId); return "bucket: " + bucketId + "(" + codec.getVersion() + "." + http://git-wip-us.apache.org/repos/asf/hive/blob/1aec6a39/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 31338d7..a789dd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2076,9 +2076,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); ValidWriteIdList validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString() + if (LOG.isDebugEnabled()) { + LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString() + " isTransactionalTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)); - + LOG.debug("Creating merger for {} and {}", split.getPath(), Arrays.toString(deltas)); + } final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, validWriteIdList, readOptions, deltas, mergerOptions); http://git-wip-us.apache.org/repos/asf/hive/blob/1aec6a39/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 8c7c72e..9d954ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -249,6 +249,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } while (nextRecord() != null && (minKey != null && key.compareRow(getMinKey()) <= 0)); } + @Override + public String toString() { + return "[key=" + key + ", nextRecord=" + nextRecord + ", reader=" + reader + "]"; + } @Override public final OrcStruct nextRecord() { return nextRecord; } @@ -281,7 +285,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ OrcRecordUpdater.getRowId(nextRecord()), OrcRecordUpdater.getCurrentTransaction(nextRecord()), OrcRecordUpdater.getOperation(nextRecord()) == OrcRecordUpdater.DELETE_OPERATION); - // if this record is larger than maxKey, we need to stop if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) { LOG.debug("key " + getKey() + " > maxkey " + getMaxKey()); @@ -999,7 +1002,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // use the min/max instead of the byte range ReaderPair pair = null; - ReaderKey key = new ReaderKey(); + ReaderKey baseKey = new ReaderKey(); if (isOriginal) { options = options.clone(); if(mergerOptions.isCompacting()) { @@ -1009,7 +1012,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir()); } - pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions, + pair = new OriginalReaderPairToCompact(baseKey, bucket, options, readerPairOptions, conf, validWriteIdList, 0);//0 since base_x doesn't have a suffix (neither does pre acid write) } else { @@ -1024,7 +1027,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, tfp.syntheticWriteId, tfp.folder); } - pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), + pair = new OriginalReaderPairToRead(baseKey, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), options, readerPairOptions, conf, validWriteIdList, tfp.statementId); } } else { @@ -1039,7 +1042,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ //doing major compaction - it's possible where full compliment of bucket files is not //required (on Tez) that base_x/ doesn't have a file for 'bucket' reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf)); - pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), + pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), eventOptions); } else { @@ -1049,7 +1052,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } else { assert reader != null : "no reader? " + mergerOptions.getRootPath(); - pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), + pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), eventOptions); } } @@ -1058,7 +1061,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // if there is at least one record, put it in the map if (pair.nextRecord() != null) { - readers.put(key, pair); + ensurePutReader(baseKey, pair); + baseKey = null; } baseReader = pair.getRecordReader(); } @@ -1088,7 +1092,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options, rawCompactOptions, conf, validWriteIdList, deltaDir.getStatementId()); if (deltaPair.nextRecord() != null) { - readers.put(key, deltaPair); + ensurePutReader(key, deltaPair); + key = new ReaderKey(); } continue; } @@ -1101,6 +1106,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ */ continue; } + LOG.debug("Looking at delta file {}", deltaFile); if(deltaDir.isDeleteDelta()) { //if here it maybe compaction or regular read or Delete event sorter //in the later 2 cases we should do: @@ -1109,7 +1115,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions); if (deltaPair.nextRecord() != null) { - readers.put(key, deltaPair); + ensurePutReader(key, deltaPair); + key = new ReaderKey(); } continue; } @@ -1123,13 +1130,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions); if (deltaPair.nextRecord() != null) { - readers.put(key, deltaPair); + ensurePutReader(key, deltaPair); + key = new ReaderKey(); } } } } // get the first record + LOG.debug("Final reader map {}", readers); Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry(); if (entry == null) { columns = 0; @@ -1146,6 +1155,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } } + private void ensurePutReader(ReaderKey key, ReaderPair deltaPair) throws IOException { + ReaderPair oldPair = readers.put(key, deltaPair); + if (oldPair == null) return; + String error = "Two readers for " + key + ": new " + deltaPair + ", old " + oldPair; + LOG.error(error); + throw new IOException(error); + } + /** * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} * type files into a base_x/ or delta_x_x. The data in these are then assigned ROW_IDs at read @@ -1352,6 +1369,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier); // if we are collapsing, figure out if this is a new row if (collapse || isSameRow) { + // Note: for collapse == false, this just sets keysSame. keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow); if (!keysSame) { prevKey.set(recordIdentifier);