This is an automated email from the ASF dual-hosted git repository.
ychena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f0169cd HIVE-25521 - Fix concatenate file handling when files of
different compressions are in same table/partition. (#2639)( Harish Jaiprakash,
reviewed by Panagiotis Garefalakis)
f0169cd is described below
commit f0169cdd472b08a77b7a72666899d904f6ce1e9a
Author: Harish Jaiprakash <[email protected]>
AuthorDate: Sat Oct 9 00:04:31 2021 +0530
HIVE-25521 - Fix concatenate file handling when files of different
compressions are in same table/partition. (#2639)( Harish Jaiprakash, reviewed
by Panagiotis Garefalakis)
* HIVE-25521 - Fix concatenate file handling when files of different
compressions are in same table/partition.
* Split test cases into 2 part, and not creating the reader at all for
ignored splits.
---
.../ql/io/orc/OrcFileStripeMergeRecordReader.java | 67 +++++++++++-----------
.../io/orc/TestOrcFileStripeMergeRecordReader.java | 34 +++++++++--
2 files changed, 64 insertions(+), 37 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
index 2ebfd29..e677842 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
@@ -40,19 +40,23 @@ public class OrcFileStripeMergeRecordReader implements
protected Iterator<StripeInformation> iter;
protected List<OrcProto.StripeStatistics> stripeStatistics;
private int stripeIdx;
- private long start;
- private long end;
private boolean skipFile;
public OrcFileStripeMergeRecordReader(Configuration conf, FileSplit split)
throws IOException {
path = split.getPath();
- start = split.getStart();
- end = start + split.getLength();
- FileSystem fs = path.getFileSystem(conf);
- this.reader = OrcFile.createReader(path,
OrcFile.readerOptions(conf).filesystem(fs));
- this.iter = reader.getStripes().iterator();
- this.stripeIdx = 0;
- this.stripeStatistics = ((ReaderImpl)
reader).getOrcProtoStripeStatistics();
+ long start = split.getStart();
+ // if the combined split has only part of the file split, the entire file
will be handled by the mapper that
+ // owns the start of file split.
+ skipFile = start > 0; // skip the file if start is not 0
+ if (!skipFile) {
+ FileSystem fs = path.getFileSystem(conf);
+ this.reader = OrcFile.createReader(path,
OrcFile.readerOptions(conf).filesystem(fs));
+ this.iter = reader.getStripes().iterator();
+ this.stripeIdx = 0;
+ this.stripeStatistics = ((ReaderImpl)
reader).getOrcProtoStripeStatistics();
+ } else {
+ reader = null;
+ }
}
public Class<?> getKeyClass() {
@@ -90,33 +94,27 @@ public class OrcFileStripeMergeRecordReader implements
return true;
}
- while (iter.hasNext()) {
+ // file split starts with 0 and hence this mapper owns concatenate of all
stripes in the file.
+ if (iter.hasNext()) {
StripeInformation si = iter.next();
-
- // if stripe offset is outside the split boundary then ignore the current
- // stripe as it will be handled by some other mapper.
- if (si.getOffset() >= start && si.getOffset() < end) {
- valueWrapper.setStripeStatistics(stripeStatistics.get(stripeIdx++));
- valueWrapper.setStripeInformation(si);
- if (!iter.hasNext()) {
- valueWrapper.setLastStripeInFile(true);
- Map<String, ByteBuffer> userMeta = new HashMap<>();
- for(String key: reader.getMetadataKeys()) {
- userMeta.put(key, reader.getMetadataValue(key));
- }
- valueWrapper.setUserMetadata(userMeta);
+ valueWrapper.setStripeStatistics(stripeStatistics.get(stripeIdx));
+ valueWrapper.setStripeInformation(si);
+ if (!iter.hasNext()) {
+ valueWrapper.setLastStripeInFile(true);
+ Map<String, ByteBuffer> userMeta = new HashMap<>();
+ for(String key: reader.getMetadataKeys()) {
+ userMeta.put(key, reader.getMetadataValue(key));
}
- keyWrapper.setInputPath(path);
- keyWrapper.setCompression(reader.getCompressionKind());
- keyWrapper.setCompressBufferSize(reader.getCompressionSize());
- keyWrapper.setFileVersion(reader.getFileVersion());
- keyWrapper.setWriterVersion(reader.getWriterVersion());
- keyWrapper.setRowIndexStride(reader.getRowIndexStride());
- keyWrapper.setFileSchema(reader.getSchema());
- } else {
- stripeIdx++;
- continue;
+ valueWrapper.setUserMetadata(userMeta);
}
+ keyWrapper.setInputPath(path);
+ keyWrapper.setCompression(reader.getCompressionKind());
+ keyWrapper.setCompressBufferSize(reader.getCompressionSize());
+ keyWrapper.setFileVersion(reader.getFileVersion());
+ keyWrapper.setWriterVersion(reader.getWriterVersion());
+ keyWrapper.setRowIndexStride(reader.getRowIndexStride());
+ keyWrapper.setFileSchema(reader.getSchema());
+ stripeIdx++;
return true;
}
@@ -143,6 +141,9 @@ public class OrcFileStripeMergeRecordReader implements
}
public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
}
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
index e2257fd..af1a54f 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFileStripeMergeRecordReader.java
@@ -26,6 +26,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -34,7 +35,7 @@ import org.junit.rules.TestName;
public class TestOrcFileStripeMergeRecordReader {
- private final int DEFAULT_STRIPE_SIZE = 5000;
+ private static final int TEST_STRIPE_SIZE = 5000;
private OrcFileKeyWrapper key;
private OrcFileValueWrapper value;
@@ -52,19 +53,44 @@ public class TestOrcFileStripeMergeRecordReader {
key = new OrcFileKeyWrapper();
value = new OrcFileValueWrapper();
tmpPath = prepareTmpPath();
+ createOrcFile(TEST_STRIPE_SIZE, TEST_STRIPE_SIZE + 1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ fs.delete(tmpPath, false);
}
@Test
- public void testSplitStartsWithOffset() throws IOException {
- createOrcFile(DEFAULT_STRIPE_SIZE, DEFAULT_STRIPE_SIZE + 1);
+ public void testSplitStartsWithNonZeroOffset() throws IOException {
FileStatus fileStatus = fs.getFileStatus(tmpPath);
long length = fileStatus.getLen();
long offset = length / 2;
- FileSplit split = new FileSplit(tmpPath, offset, length, (String[])null);
+
+ // Check case for non-zero offset, the file will be skipped.
+ FileSplit split = new FileSplit(tmpPath, offset, length, (String[]) null);
+ OrcFileStripeMergeRecordReader reader = new
OrcFileStripeMergeRecordReader(conf, split);
+ reader.next(key, value);
+ Assert.assertNull(key.getInputPath());
+ }
+
+ @Test
+ public void testSplitStartsWithZeroOffset() throws IOException {
+ FileStatus fileStatus = fs.getFileStatus(tmpPath);
+ long length = fileStatus.getLen();
+ // New split with zero offset, the file should be processed.
+ FileSplit split = new FileSplit(tmpPath, 0, length, (String[]) null);
OrcFileStripeMergeRecordReader reader = new
OrcFileStripeMergeRecordReader(conf, split);
+ // both stripes will be processed, first stripe has 5000 rows and second
stripe has 1 row
+ reader.next(key, value);
+ Assert.assertEquals("InputPath", tmpPath, key.getInputPath());
+ Assert.assertEquals("NumberOfValues", TEST_STRIPE_SIZE,
+ value.getStripeStatistics().getColStats(0).getNumberOfValues());
reader.next(key, value);
Assert.assertEquals("InputPath", tmpPath, key.getInputPath());
Assert.assertEquals("NumberOfValues", 1L,
value.getStripeStatistics().getColStats(0).getNumberOfValues());
+ // we are done with the file, so expect null path
+ Assert.assertFalse(reader.next(key, value));
reader.close();
}