Repository: hive Updated Branches: refs/heads/beeline-cli 046c5ebdb -> 6b3e82d39
HIVE-11583 : When PTF is used over a large partitions result could be corrupted (Illya Yalovyy via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d524e06 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d524e06 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d524e06 Branch: refs/heads/beeline-cli Commit: 8d524e062e6a8ad8c592e6067cea254c054797cd Parents: 7be02ae Author: Illya Yalovyy <[email protected]> Authored: Mon Sep 14 10:18:00 2015 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Wed Sep 16 18:15:08 2015 -0700 ---------------------------------------------------------------------- .../ql/exec/persistence/PTFRowContainer.java | 14 +++++---- .../hive/ql/exec/persistence/RowContainer.java | 12 +++++--- .../exec/persistence/TestPTFRowContainer.java | 31 ++++++++++++++------ 3 files changed, 38 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java index d2bfea6..61cc6e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java @@ -81,8 +81,8 @@ import org.apache.hadoop.util.Progressable; */ public class PTFRowContainer<Row extends List<Object>> extends RowContainer<Row> { - ArrayList<BlockInfo> blockInfos; - int currentReadBlockStartRow; + private ArrayList<BlockInfo> blockInfos; + private int currentReadBlockStartRow; public PTFRowContainer(int bs, Configuration jc, Reporter reporter ) throws HiveException { @@ -190,14 +190,16 @@ public class PTFRowContainer<Row extends List<Object>> extends RowContainer<Row> BlockInfo bI = blockInfos.get(blockNum); int startSplit = bI.startingSplit; - int endSplit = startSplit; - if ( blockNum != blockInfos.size() - 1) { - endSplit = blockInfos.get(blockNum+1).startingSplit; + int endSplit; + if ( blockNum != blockInfos.size() - 1 ) { + endSplit = blockInfos.get(blockNum + 1).startingSplit; + } else { + endSplit = getLastActualSplit(); } try { int readIntoOffset = 0; - for(int i = startSplit; i <= endSplit; i++ ) { + for(int i = startSplit; i <= endSplit && readIntoOffset < getBlockSize(); i++ ) { org.apache.hadoop.mapred.RecordReader rr = setReaderAtSplit(i); if ( i == startSplit ) { ((PTFSequenceFileRecordReader)rr).seek(bI.startOffset); http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java index 4252bd1..68dc482 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java @@ -103,7 +103,7 @@ public class RowContainer<ROW extends List<Object>> boolean firstCalled = false; // once called first, it will never be able to // write again. - int acutalSplitNum = 0; + private int actualSplitNum = 0; int currentSplitPointer = 0; org.apache.hadoop.mapred.RecordReader rr = null; // record reader RecordWriter rw = null; @@ -220,7 +220,7 @@ public class RowContainer<ROW extends List<Object>> HiveConf.setVar(localJc, HiveConf.ConfVars.HADOOPMAPREDINPUTDIR, org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath())); inputSplits = inputFormat.getSplits(localJc, 1); - acutalSplitNum = inputSplits.length; + actualSplitNum = inputSplits.length; } currentSplitPointer = 0; rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], @@ -375,7 +375,7 @@ public class RowContainer<ROW extends List<Object>> } } - if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) { + if (nextSplit && this.currentSplitPointer < this.actualSplitNum) { JobConf localJc = getLocalFSJobConfClone(jc); // open record reader to read next split rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs, @@ -421,7 +421,7 @@ public class RowContainer<ROW extends List<Object>> addCursor = 0; numFlushedBlocks = 0; this.readBlockSize = 0; - this.acutalSplitNum = 0; + this.actualSplitNum = 0; this.currentSplitPointer = -1; this.firstCalled = false; this.inputSplits = null; @@ -606,4 +606,8 @@ public class RowContainer<ROW extends List<Object>> clearRows(); currentReadBlock = firstReadBlockPointer = currentWriteBlock = null; } + + protected int getLastActualSplit() { + return actualSplitNum - 1; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8d524e06/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java index a404ff0..0611072 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestPTFRowContainer.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -37,11 +39,13 @@ import org.apache.hadoop.io.Text; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; + @SuppressWarnings("deprecation") public class TestPTFRowContainer { - private static final String COL_NAMES = "x,y,z,a,b"; - private static final String COL_TYPES = "int,string,double,int,string"; + private static final String COL_NAMES = "x,y,z,a,b,v"; + private static final String COL_TYPES = "int,string,double,int,string,string"; static SerDe serDe; static Configuration cfg; @@ -70,7 +74,7 @@ public class TestPTFRowContainer { return rc; } - private void runTest(int sz, int blockSize) throws SerDeException, HiveException { + private void runTest(int sz, int blockSize, String value) throws SerDeException, HiveException { List<Object> row; PTFRowContainer<List<Object>> rc = rowContainer(blockSize); @@ -82,16 +86,17 @@ public class TestPTFRowContainer { row.add(new DoubleWritable(i)); row.add(new IntWritable(i)); row.add(new Text("def " + i)); + row.add(new Text(value)); rc.addRow(row); } // test forward scan - assert(rc.rowCount() == sz); + assertEquals(sz, rc.rowCount()); i = 0; row = new ArrayList<Object>(); row = rc.first(); while(row != null ) { - assert(row.get(1).toString().equals("abc " + i)); + assertEquals("abc " + i, row.get(1).toString()); i++; row = rc.next(); } @@ -100,7 +105,7 @@ public class TestPTFRowContainer { row = rc.first(); for(i = sz - 1; i >= 0; i-- ) { row = rc.getAt(i); - assert(row.get(1).toString().equals("abc " + i)); + assertEquals("abc " + i, row.get(1).toString()); } Random r = new Random(1000L); @@ -109,20 +114,23 @@ public class TestPTFRowContainer { for(i=0; i < 100; i++) { int j = r.nextInt(sz); row = rc.getAt(j); - assert(row.get(1).toString().equals("abc " + j)); + assertEquals("abc " + j, row.get(1).toString()); } // intersperse getAt and next calls for(i=0; i < 100; i++) { int j = r.nextInt(sz); row = rc.getAt(j); - assert(row.get(1).toString().equals("abc " + j)); + assertEquals("abc " + j, row.get(1).toString()); for(int k = j + 1; k < j + (blockSize/4) && k < sz; k++) { row = rc.next(); - assert(row.get(4).toString().equals("def " + k)); + assertEquals("def " + k, row.get(4).toString()); } } + } + private void runTest(int sz, int blockSize) throws SerDeException, HiveException { + runTest(sz, blockSize, ""); } @Test @@ -134,4 +142,9 @@ public class TestPTFRowContainer { public void testSmallBlockSize() throws SerDeException, HiveException { runTest(10 * 1000, 5); } + + @Test + public void testBlocksLargerThanSplit() throws SerDeException, HiveException, IOException { + runTest(5, 2, new String(new char[(int)FileSystem.getLocal(cfg).getDefaultBlockSize()])); + } }
