Repository: hive
Updated Branches:
  refs/heads/master 645458532 -> 1aec6a394


OrcRawRecordMerger doesn't work for more than one file in non vectorized case 
(Sergey Shelukhin, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1aec6a39
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1aec6a39
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1aec6a39

Branch: refs/heads/master
Commit: 1aec6a39424942f5dc7974413ad5ea2829985a73
Parents: 6454585
Author: sergey <ser...@apache.org>
Authored: Sat Jun 9 09:31:16 2018 -0700
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Sat Jun 9 09:31:16 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/RecordIdentifier.java     |  1 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  6 ++--
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 38 ++++++++++++++------
 3 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1aec6a39/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
index 607abfd..ea7ba53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
@@ -226,6 +226,7 @@ public class RecordIdentifier implements 
WritableComparable<RecordIdentifier> {
     return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: 
" + getRowId() +"}";
   }
   protected String bucketToString() {
+    if (bucketId == -1) return ("bucket: " + bucketId);
     BucketCodec codec =
       BucketCodec.determineVersion(bucketId);
     return  "bucket: " + bucketId + "(" + codec.getVersion() + "." +

http://git-wip-us.apache.org/repos/asf/hive/blob/1aec6a39/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 31338d7..a789dd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -2076,9 +2076,11 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
     ValidWriteIdList validWriteIdList
             = (txnString == null) ? new ValidReaderWriteIdList() : new 
ValidReaderWriteIdList(txnString);
-    LOG.debug("getReader:: Read ValidWriteIdList: " + 
validWriteIdList.toString()
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getReader:: Read ValidWriteIdList: " + 
validWriteIdList.toString()
             + " isTransactionalTable: " + HiveConf.getBoolVar(conf, 
ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN));
-
+      LOG.debug("Creating merger for {} and {}", split.getPath(), 
Arrays.toString(deltas));
+    }
     final OrcRawRecordMerger records =
         new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
             validWriteIdList, readOptions, deltas, mergerOptions);

http://git-wip-us.apache.org/repos/asf/hive/blob/1aec6a39/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 8c7c72e..9d954ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -249,6 +249,10 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
       } while (nextRecord() != null &&
         (minKey != null && key.compareRow(getMinKey()) <= 0));
     }
+    @Override
+    public String toString() {
+      return "[key=" + key + ", nextRecord=" + nextRecord + ", reader=" + 
reader + "]";
+    }
     @Override public final OrcStruct nextRecord() {
       return nextRecord;
     }
@@ -281,7 +285,6 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
             OrcRecordUpdater.getRowId(nextRecord()),
             OrcRecordUpdater.getCurrentTransaction(nextRecord()),
             OrcRecordUpdater.getOperation(nextRecord()) == 
OrcRecordUpdater.DELETE_OPERATION);
-
         // if this record is larger than maxKey, we need to stop
         if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) {
           LOG.debug("key " + getKey() + " > maxkey " + getMaxKey());
@@ -999,7 +1002,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
       LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + 
keyInterval.getMaxKey());
       // use the min/max instead of the byte range
       ReaderPair pair = null;
-      ReaderKey key = new ReaderKey();
+      ReaderKey baseKey = new ReaderKey();
       if (isOriginal) {
         options = options.clone();
         if(mergerOptions.isCompacting()) {
@@ -1009,7 +1012,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
             readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
               AcidUtils.parseBase(mergerOptions.getBaseDir()), 
mergerOptions.getBaseDir());
           }
-          pair = new OriginalReaderPairToCompact(key, bucket, options, 
readerPairOptions,
+          pair = new OriginalReaderPairToCompact(baseKey, bucket, options, 
readerPairOptions,
             conf, validWriteIdList,
             0);//0 since base_x doesn't have a suffix (neither does pre acid 
write)
         } else {
@@ -1024,7 +1027,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
             readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
               tfp.syntheticWriteId, tfp.folder);
           }
-          pair = new OriginalReaderPairToRead(key, reader, bucket, 
keyInterval.getMinKey(),
+          pair = new OriginalReaderPairToRead(baseKey, reader, bucket, 
keyInterval.getMinKey(),
             keyInterval.getMaxKey(), options,  readerPairOptions, conf, 
validWriteIdList, tfp.statementId);
         }
       } else {
@@ -1039,7 +1042,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
             //doing major compaction - it's possible where full compliment of 
bucket files is not
             //required (on Tez) that base_x/ doesn't have a file for 'bucket'
             reader = OrcFile.createReader(bucketPath, 
OrcFile.readerOptions(conf));
-            pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), 
keyInterval.getMaxKey(),
+            pair = new ReaderPairAcid(baseKey, reader, 
keyInterval.getMinKey(), keyInterval.getMaxKey(),
               eventOptions);
           }
           else {
@@ -1049,7 +1052,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
         }
         else {
           assert reader != null : "no reader? " + mergerOptions.getRootPath();
-          pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), 
keyInterval.getMaxKey(),
+          pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), 
keyInterval.getMaxKey(),
             eventOptions);
         }
       }
@@ -1058,7 +1061,8 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
       LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " 
+ keyInterval.getMaxKey());
       // if there is at least one record, put it in the map
       if (pair.nextRecord() != null) {
-        readers.put(key, pair);
+        ensurePutReader(baseKey, pair);
+        baseKey = null;
       }
       baseReader = pair.getRecordReader();
     }
@@ -1088,7 +1092,8 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
           ReaderPair deltaPair =  new OriginalReaderPairToCompact(key, bucket, 
options,
               rawCompactOptions, conf, validWriteIdList, 
deltaDir.getStatementId());
           if (deltaPair.nextRecord() != null) {
-            readers.put(key, deltaPair);
+            ensurePutReader(key, deltaPair);
+            key = new ReaderKey();
           }
           continue;
         }
@@ -1101,6 +1106,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
              */
             continue;
           }
+          LOG.debug("Looking at delta file {}", deltaFile);
           if(deltaDir.isDeleteDelta()) {
             //if here it maybe compaction or regular read or Delete event 
sorter
             //in the later 2 cases we should do:
@@ -1109,7 +1115,8 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
             ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, 
minKey, maxKey,
                 deltaEventOptions);
             if (deltaPair.nextRecord() != null) {
-              readers.put(key, deltaPair);
+              ensurePutReader(key, deltaPair);
+              key = new ReaderKey();
             }
             continue;
           }
@@ -1123,13 +1130,15 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
           //must get statementId from file name since Acid 1.0 doesn't write 
it into bucketProperty
           ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, 
minKey, maxKey, deltaEventOptions);
           if (deltaPair.nextRecord() != null) {
-            readers.put(key, deltaPair);
+            ensurePutReader(key, deltaPair);
+            key = new ReaderKey();
           }
         }
       }
     }
 
     // get the first record
+    LOG.debug("Final reader map {}", readers);
     Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
     if (entry == null) {
       columns = 0;
@@ -1146,6 +1155,14 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
     }
   }
 
+  private void ensurePutReader(ReaderKey key, ReaderPair deltaPair) throws 
IOException {
+    ReaderPair oldPair = readers.put(key, deltaPair);
+    if (oldPair == null) return;
+    String error = "Two readers for " + key + ": new " + deltaPair + ", old " 
+ oldPair;
+    LOG.error(error);
+    throw new IOException(error);
+  }
+
   /**
    * For use with Load Data statement which places {@link 
AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
    * type files into a base_x/ or delta_x_x.  The data in these are then 
assigned ROW_IDs at read
@@ -1352,6 +1369,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
       boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
       // if we are collapsing, figure out if this is a new row
       if (collapse || isSameRow) {
+        // Note: for collapse == false, this just sets keysSame.
         keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || 
(isSameRow);
         if (!keysSame) {
           prevKey.set(recordIdentifier);

Reply via email to