Repository: hive Updated Branches: refs/heads/master 975a49b6f -> ea3c79e4f
HIVE-13403 : Make Streaming API not create empty buckets (Wei Zheng, reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ea3c79e4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ea3c79e4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ea3c79e4 Branch: refs/heads/master Commit: ea3c79e4f41775122210670bc7e9ec23aa4d487e Parents: 975a49b Author: Wei Zheng <[email protected]> Authored: Wed Aug 24 16:30:58 2016 -0700 Committer: Wei Zheng <[email protected]> Committed: Wed Aug 24 16:30:58 2016 -0700 ---------------------------------------------------------------------- .../streaming/AbstractRecordWriter.java | 59 ++++++++++---------- .../streaming/DelimitedInputWriter.java | 2 +- .../hcatalog/streaming/StrictJsonWriter.java | 2 +- .../hive/hcatalog/streaming/TestStreaming.java | 20 ++----- .../hive/ql/txn/compactor/TestCompactor.java | 10 ++-- 5 files changed, 42 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index b8615cb..24b952e 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -178,7 +178,9 @@ public abstract class AbstractRecordWriter implements RecordWriter { public void flush() throws StreamingIOFailure { try { for (RecordUpdater updater : updaters) { - updater.flush(); + if (updater != null) { + updater.flush(); + } } } catch (IOException e) { throw new StreamingIOFailure("Unable to flush recordUpdater", e); @@ -198,15 +200,11 @@ public abstract class AbstractRecordWriter implements RecordWriter { @Override public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure, SerializationError { - try { - LOG.debug("Creating Record updater"); - curBatchMinTxnId = minTxnId; - curBatchMaxTxnId = maxTxnID; - updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID); - } catch (IOException e) { - String errMsg = "Failed creating RecordUpdaterS for " + getWatermark(); - LOG.error(errMsg, e); - throw new StreamingIOFailure(errMsg, e); + curBatchMinTxnId = minTxnId; + curBatchMaxTxnId = maxTxnID; + updaters = new ArrayList<RecordUpdater>(totalBuckets); + for (int bucket = 0; bucket < totalBuckets; bucket++) { + updaters.add(bucket, null); } } @@ -214,13 +212,14 @@ public abstract class AbstractRecordWriter implements RecordWriter { public void closeBatch() throws StreamingIOFailure { boolean haveError = false; for (RecordUpdater updater : updaters) { - try { - //try not to leave any files open - updater.close(false); - } - catch(Exception ex) { - haveError = true; - LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex); + if (updater != null) { + try { + //try not to leave any files open + updater.close(false); + } catch (Exception ex) { + haveError = true; + LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex); + } } } updaters.clear(); @@ -252,17 +251,6 @@ public abstract class AbstractRecordWriter implements RecordWriter { return bucketFieldData; } - - - private ArrayList<RecordUpdater> createRecordUpdaters(int bucketCount, Long minTxnId, Long maxTxnID) - throws IOException, SerializationError { - ArrayList<RecordUpdater> result = new ArrayList<RecordUpdater>(bucketCount); - for (int bucket = 0; bucket < bucketCount; bucket++) { - result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) ); - } - return result; - } - private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { @@ -286,6 +274,21 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } + RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, SerializationError { + RecordUpdater recordUpdater = updaters.get(bucketId); + if (recordUpdater == null) { + try { + recordUpdater = createRecordUpdater(bucketId, curBatchMinTxnId, curBatchMaxTxnId); + } catch (IOException e) { + String errMsg = "Failed creating RecordUpdater for " + getWatermark(); + LOG.error(errMsg, e); + throw new StreamingIOFailure(errMsg, e); + } + updaters.set(bucketId, recordUpdater); + } + return recordUpdater; + } + private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint) throws StreamingException { try { http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 7ab2fc6..87eb4c4 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -262,7 +262,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter { byte[] orderedFields = reorderFields(record); Object encodedRow = encode(orderedFields); int bucket = getBucket(encodedRow); - updaters.get(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction (" + transactionId + ")", e); http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 1facad1..31212ee 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -122,7 +122,7 @@ public class StrictJsonWriter extends AbstractRecordWriter { try { Object encodedRow = encode(record); int bucket = getBucket(encodedRow); - updaters.get(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction(" + transactionId + ")", e); http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 40cf2b5..197ca7b 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -1208,8 +1208,6 @@ public class TestStreaming { txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0 txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1 txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2 - // no data for bucket 3 -- expect 0 length bucket file - txnBatch2.commit(); @@ -1223,13 +1221,11 @@ public class TestStreaming { System.err.println(actual2); // assert bucket listing is as expected - Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 4); + Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 3); Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2); Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1); - Assert.assertEquals("records in bucket does not match expectation", actual1.get(2).size(), 0); + Assert.assertTrue("bucket 2 shouldn't have been created", actual1.get(2) == null); Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1); - - } private void runCmdOnDriver(String cmd) throws QueryFailedException { boolean t = runDDL(driver, cmd); @@ -1359,12 +1355,7 @@ public class TestStreaming { } else if (file.contains("bucket_00001")) { corruptDataFile(file, conf, -1); } else if (file.contains("bucket_00002")) { - // since we are adding more bytes we know the length of the file is already readable - Path bPath = new Path(file); - FileSystem fs = bPath.getFileSystem(conf); - FileStatus fileStatus = fs.getFileStatus(bPath); - readableFooter = (int) fileStatus.getLen(); - corruptDataFile(file, conf, 2); + Assert.assertFalse("bucket 2 shouldn't have been created", true); } else if (file.contains("bucket_00003")) { corruptDataFile(file, conf, 100); } @@ -1381,7 +1372,7 @@ public class TestStreaming { String errDump = new String(myErr.toByteArray()); Assert.assertEquals(false, errDump.contains("Exception")); - Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted")); + Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted")); Assert.assertEquals(false, errDump.contains("is still open for writes.")); origErr = System.err; @@ -1397,9 +1388,6 @@ public class TestStreaming { Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!")); Assert.assertEquals(true, errDump.contains("No readable footers found. Creating empty orc file.")); Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!")); - Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!")); - // check for bucket2's last readable footer offset - Assert.assertEquals(true, errDump.contains("Readable footerOffsets: [" + readableFooter + "]")); Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!")); Assert.assertEquals(false, errDump.contains("Exception")); Assert.assertEquals(false, errDump.contains("is still open for writes.")); http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index f81752f..0587e80 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -778,13 +778,13 @@ public class TestCompactor { Path resultDelta = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000006")) { + if (names[i].equals("delta_0000001_0000004")) { resultDelta = stat[i].getPath(); } } Arrays.sort(names); String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"}; + "delta_0000001_0000004", "delta_0000003_0000004"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } @@ -844,11 +844,11 @@ public class TestCompactor { Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat)); } if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - if (!name.equals("base_0000006")) { - Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006"); + if (!name.equals("base_0000004")) { + Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004"); } checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally {
