This is an automated email from the ASF dual-hosted git repository.

ychena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f0169cd  HIVE-25521 - Fix concatenate file handling when files of 
different compressions are in same table/partition. (#2639)( Harish Jaiprakash, 
reviewed by Panagiotis Garefalakis)
f0169cd is described below

commit f0169cdd472b08a77b7a72666899d904f6ce1e9a
Author: Harish Jaiprakash <[email protected]>
AuthorDate: Sat Oct 9 00:04:31 2021 +0530

    HIVE-25521 - Fix concatenate file handling when files of different 
compressions are in same table/partition. (#2639)( Harish Jaiprakash, reviewed 
by Panagiotis Garefalakis)
    
    * HIVE-25521 - Fix concatenate file handling when files of different 
compressions are in same table/partition.
    
    * Split test cases into 2 part, and not creating the reader at all for 
ignored splits.
---
 .../ql/io/orc/OrcFileStripeMergeRecordReader.java  | 67 +++++++++++-----------
 .../io/orc/TestOrcFileStripeMergeRecordReader.java | 34 +++++++++--
 2 files changed, 64 insertions(+), 37 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
index 2ebfd29..e677842 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
@@ -40,19 +40,23 @@ public class OrcFileStripeMergeRecordReader implements
   protected Iterator<StripeInformation> iter;
   protected List<OrcProto.StripeStatistics> stripeStatistics;
   private int stripeIdx;
-  private long start;
-  private long end;
   private boolean skipFile;
 
   public OrcFileStripeMergeRecordReader(Configuration conf, FileSplit split) 
throws IOException {
     path = split.getPath();
-    start = split.getStart();
-    end = start + split.getLength();
-    FileSystem fs = path.getFileSystem(conf);
-    this.reader = OrcFile.createReader(path, 
OrcFile.readerOptions(conf).filesystem(fs));
-    this.iter = reader.getStripes().iterator();
-    this.stripeIdx = 0;
-    this.stripeStatistics = ((ReaderImpl) 
reader).getOrcProtoStripeStatistics();
+    long start = split.getStart();
+    // if the combined split has only part of the file split, the entire file 
will be handled by the mapper that
+    // owns the start of file split.
+    skipFile = start > 0; // skip the file if start is not 0
+    if (!skipFile) {
+      FileSystem fs = path.getFileSystem(conf);
+      this.reader = OrcFile.createReader(path, 
OrcFile.readerOptions(conf).filesystem(fs));
+      this.iter = reader.getStripes().iterator();
+      this.stripeIdx = 0;
+      this.stripeStatistics = ((ReaderImpl) 
reader).getOrcProtoStripeStatistics();
+    } else {
+      reader = null;
+    }
   }
 
   public Class<?> getKeyClass() {
@@ -90,33 +94,27 @@ public class OrcFileStripeMergeRecordReader implements
       return true;
     }
 
-    while (iter.hasNext()) {
+    // file split starts with 0 and hence this mapper owns concatenate of all 
stripes in the file.
+    if (iter.hasNext()) {
       StripeInformation si = iter.next();
-
-      // if stripe offset is outside the split boundary then ignore the current
-      // stripe as it will be handled by some other mapper.
-      if (si.getOffset() >= start && si.getOffset() < end) {
-        valueWrapper.setStripeStatistics(stripeStatistics.get(stripeIdx++));
-        valueWrapper.setStripeInformation(si);
-        if (!iter.hasNext()) {
-          valueWrapper.setLastStripeInFile(true);
-          Map<String, ByteBuffer> userMeta = new HashMap<>();
-          for(String key: reader.getMetadataKeys()) {
-            userMeta.put(key, reader.getMetadataValue(key));
-          }
-          valueWrapper.setUserMetadata(userMeta);
+      valueWrapper.setStripeStatistics(stripeStatistics.get(stripeIdx));
+      valueWrapper.setStripeInformation(si);
+      if (!iter.hasNext()) {
+        valueWrapper.setLastStripeInFile(true);
+        Map<String, ByteBuffer> userMeta = new HashMap<>();
+        for(String key: reader.getMetadataKeys()) {
+          userMeta.put(key, reader.getMetadataValue(key));
         }
-        keyWrapper.setInputPath(path);
-        keyWrapper.setCompression(reader.getCompressionKind());
-        keyWrapper.setCompressBufferSize(reader.getCompressionSize());
-        keyWrapper.setFileVersion(reader.getFileVersion());
-        keyWrapper.setWriterVersion(reader.getWriterVersion());
-        keyWrapper.setRowIndexStride(reader.getRowIndexStride());
-        keyWrapper.setFileSchema(reader.getSchema());
-      } else {
-        stripeIdx++;
-        continue;
+        valueWrapper.setUserMetadata(userMeta);
       }
+      keyWrapper.setInputPath(path);
+      keyWrapper.setCompression(reader.getCompressionKind());
+      keyWrapper.setCompressBufferSize(reader.getCompressionSize());
+      keyWrapper.setFileVersion(reader.getFileVersion());
+      keyWrapper.setWriterVersion(reader.getWriterVersion());
+      keyWrapper.setRowIndexStride(reader.getRowIndexStride());
+      keyWrapper.setFileSchema(reader.getSchema());
+      stripeIdx++;
       return true;
     }
 
@@ -143,6 +141,9 @@ public class OrcFileStripeMergeRecordReader implements
   }
 
   public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
   }
 
 }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
index e2257fd..af1a54f 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
@@ -26,6 +26,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileSplit;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -34,7 +35,7 @@ import org.junit.rules.TestName;
 
 public class TestOrcFileStripeMergeRecordReader {
 
-  private final int DEFAULT_STRIPE_SIZE = 5000;
+  private static final int TEST_STRIPE_SIZE = 5000;
 
   private OrcFileKeyWrapper key;
   private OrcFileValueWrapper value;
@@ -52,19 +53,44 @@ public class TestOrcFileStripeMergeRecordReader {
     key = new OrcFileKeyWrapper();
     value = new OrcFileValueWrapper();
     tmpPath  = prepareTmpPath();
+    createOrcFile(TEST_STRIPE_SIZE, TEST_STRIPE_SIZE + 1);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    fs.delete(tmpPath, false);
   }
 
   @Test
-  public void testSplitStartsWithOffset() throws IOException {
-    createOrcFile(DEFAULT_STRIPE_SIZE, DEFAULT_STRIPE_SIZE + 1);
+  public void testSplitStartsWithNonZeroOffset() throws IOException {
     FileStatus fileStatus = fs.getFileStatus(tmpPath);
     long length = fileStatus.getLen();
     long offset = length / 2;
-    FileSplit split = new FileSplit(tmpPath, offset, length, (String[])null);
+
+    // Check case for non-zero offset, the file will be skipped.
+    FileSplit split = new FileSplit(tmpPath, offset, length, (String[]) null);
+    OrcFileStripeMergeRecordReader reader = new 
OrcFileStripeMergeRecordReader(conf, split);
+    reader.next(key, value);
+    Assert.assertNull(key.getInputPath());
+  }
+
+  @Test
+  public void testSplitStartsWithZeroOffset() throws IOException {
+    FileStatus fileStatus = fs.getFileStatus(tmpPath);
+    long length = fileStatus.getLen();
+    // New split with zero offset, the file should be processed.
+    FileSplit split = new FileSplit(tmpPath, 0, length, (String[]) null);
     OrcFileStripeMergeRecordReader reader = new 
OrcFileStripeMergeRecordReader(conf, split);
+    // both stripes will be processed, first stripe has 5000 rows and second 
stripe has 1 row
+    reader.next(key, value);
+    Assert.assertEquals("InputPath", tmpPath, key.getInputPath());
+    Assert.assertEquals("NumberOfValues", TEST_STRIPE_SIZE,
+        value.getStripeStatistics().getColStats(0).getNumberOfValues());
     reader.next(key, value);
     Assert.assertEquals("InputPath", tmpPath, key.getInputPath());
     Assert.assertEquals("NumberOfValues", 1L, 
value.getStripeStatistics().getColStats(0).getNumberOfValues());
+    // we are done with the file, so expect null path
+    Assert.assertFalse(reader.next(key, value));
     reader.close();
   }
 

Reply via email to