Repository: hive
Updated Branches:
  refs/heads/master 975a49b6f -> ea3c79e4f


HIVE-13403 : Make Streaming API not create empty buckets (Wei Zheng, reviewed 
by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ea3c79e4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ea3c79e4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ea3c79e4

Branch: refs/heads/master
Commit: ea3c79e4f41775122210670bc7e9ec23aa4d487e
Parents: 975a49b
Author: Wei Zheng <[email protected]>
Authored: Wed Aug 24 16:30:58 2016 -0700
Committer: Wei Zheng <[email protected]>
Committed: Wed Aug 24 16:30:58 2016 -0700

----------------------------------------------------------------------
 .../streaming/AbstractRecordWriter.java         | 59 ++++++++++----------
 .../streaming/DelimitedInputWriter.java         |  2 +-
 .../hcatalog/streaming/StrictJsonWriter.java    |  2 +-
 .../hive/hcatalog/streaming/TestStreaming.java  | 20 ++-----
 .../hive/ql/txn/compactor/TestCompactor.java    | 10 ++--
 5 files changed, 42 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index b8615cb..24b952e 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -178,7 +178,9 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   public void flush() throws StreamingIOFailure {
     try {
       for (RecordUpdater updater : updaters) {
-        updater.flush();
+        if (updater != null) {
+          updater.flush();
+        }
       }
     } catch (IOException e) {
       throw new StreamingIOFailure("Unable to flush recordUpdater", e);
@@ -198,15 +200,11 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   @Override
   public void newBatch(Long minTxnId, Long maxTxnID)
           throws StreamingIOFailure, SerializationError {
-    try {
-      LOG.debug("Creating Record updater");
-      curBatchMinTxnId = minTxnId;
-      curBatchMaxTxnId = maxTxnID;
-      updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID);
-    } catch (IOException e) {
-      String errMsg = "Failed creating RecordUpdaterS for " + getWatermark();
-      LOG.error(errMsg, e);
-      throw new StreamingIOFailure(errMsg, e);
+    curBatchMinTxnId = minTxnId;
+    curBatchMaxTxnId = maxTxnID;
+    updaters = new ArrayList<RecordUpdater>(totalBuckets);
+    for (int bucket = 0; bucket < totalBuckets; bucket++) {
+      updaters.add(bucket, null);
     }
   }
 
@@ -214,13 +212,14 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   public void closeBatch() throws StreamingIOFailure {
     boolean haveError = false;
     for (RecordUpdater updater : updaters) {
-      try {
-        //try not to leave any files open
-        updater.close(false);
-      }
-      catch(Exception ex) {
-        haveError = true;
-        LOG.error("Unable to close " + updater + " due to: " + 
ex.getMessage(), ex);
+      if (updater != null) {
+        try {
+          //try not to leave any files open
+          updater.close(false);
+        } catch (Exception ex) {
+          haveError = true;
+          LOG.error("Unable to close " + updater + " due to: " + 
ex.getMessage(), ex);
+        }
       }
     }
     updaters.clear();
@@ -252,17 +251,6 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     return bucketFieldData;
   }
 
-
-
-  private ArrayList<RecordUpdater> createRecordUpdaters(int bucketCount, Long 
minTxnId, Long maxTxnID)
-          throws IOException, SerializationError {
-    ArrayList<RecordUpdater> result = new 
ArrayList<RecordUpdater>(bucketCount);
-    for (int bucket = 0; bucket < bucketCount; bucket++) {
-      result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) );
-    }
-    return result;
-  }
-
   private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long 
maxTxnID)
           throws IOException, SerializationError {
     try {
@@ -286,6 +274,21 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     }
   }
 
+  RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, 
SerializationError {
+    RecordUpdater recordUpdater = updaters.get(bucketId);
+    if (recordUpdater == null) {
+      try {
+        recordUpdater = createRecordUpdater(bucketId, curBatchMinTxnId, 
curBatchMaxTxnId);
+      } catch (IOException e) {
+        String errMsg = "Failed creating RecordUpdater for " + getWatermark();
+        LOG.error(errMsg, e);
+        throw new StreamingIOFailure(errMsg, e);
+      }
+      updaters.set(bucketId, recordUpdater);
+    }
+    return recordUpdater;
+  }
+
   private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint 
endPoint)
           throws StreamingException {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 7ab2fc6..87eb4c4 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -262,7 +262,7 @@ public class DelimitedInputWriter extends 
AbstractRecordWriter {
       byte[] orderedFields = reorderFields(record);
       Object encodedRow = encode(orderedFields);
       int bucket = getBucket(encodedRow);
-      updaters.get(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(transactionId, encodedRow);
     } catch (IOException e) {
       throw new StreamingIOFailure("Error writing record in transaction ("
               + transactionId + ")", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 1facad1..31212ee 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -122,7 +122,7 @@ public class StrictJsonWriter extends AbstractRecordWriter {
     try {
       Object encodedRow = encode(record);
       int bucket = getBucket(encodedRow);
-      updaters.get(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(transactionId, encodedRow);
     } catch (IOException e) {
       throw new StreamingIOFailure("Error writing record in transaction("
               + transactionId + ")", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 40cf2b5..197ca7b 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -1208,8 +1208,6 @@ public class TestStreaming {
     txnBatch2.write("name5,2,fact3".getBytes());  // bucket 0
     txnBatch2.write("name8,2,fact3".getBytes());  // bucket 1
     txnBatch2.write("name0,1,fact1".getBytes());  // bucket 2
-    // no data for bucket 3 -- expect 0 length bucket file
-
 
     txnBatch2.commit();
 
@@ -1223,13 +1221,11 @@ public class TestStreaming {
     System.err.println(actual2);
 
     // assert bucket listing is as expected
-    Assert.assertEquals("number of buckets does not match expectation", 
actual1.values().size(), 4);
+    Assert.assertEquals("number of buckets does not match expectation", 
actual1.values().size(), 3);
     Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(0).size(), 2);
     Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(1).size(), 1);
-    Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(2).size(), 0);
+    Assert.assertTrue("bucket 2 shouldn't have been created", actual1.get(2) 
== null);
     Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(3).size(), 1);
-
-
   }
   private void runCmdOnDriver(String cmd) throws QueryFailedException {
     boolean t = runDDL(driver, cmd);
@@ -1359,12 +1355,7 @@ public class TestStreaming {
       } else if (file.contains("bucket_00001")) {
         corruptDataFile(file, conf, -1);
       } else if (file.contains("bucket_00002")) {
-        // since we are adding more bytes we know the length of the file is 
already readable
-        Path bPath = new Path(file);
-        FileSystem fs = bPath.getFileSystem(conf);
-        FileStatus fileStatus = fs.getFileStatus(bPath);
-        readableFooter = (int) fileStatus.getLen();
-        corruptDataFile(file, conf, 2);
+        Assert.assertFalse("bucket 2 shouldn't have been created", true);
       } else if (file.contains("bucket_00003")) {
         corruptDataFile(file, conf, 100);
       }
@@ -1381,7 +1372,7 @@ public class TestStreaming {
 
     String errDump = new String(myErr.toByteArray());
     Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted"));
+    Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted"));
     Assert.assertEquals(false, errDump.contains("is still open for writes."));
 
     origErr = System.err;
@@ -1397,9 +1388,6 @@ public class TestStreaming {
     Assert.assertEquals(true, errDump.contains("bucket_00000 recovered 
successfully!"));
     Assert.assertEquals(true, errDump.contains("No readable footers found. 
Creating empty orc file."));
     Assert.assertEquals(true, errDump.contains("bucket_00001 recovered 
successfully!"));
-    Assert.assertEquals(true, errDump.contains("bucket_00002 recovered 
successfully!"));
-    // check for bucket2's last readable footer offset
-    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: [" + 
readableFooter + "]"));
     Assert.assertEquals(true, errDump.contains("bucket_00003 recovered 
successfully!"));
     Assert.assertEquals(false, errDump.contains("Exception"));
     Assert.assertEquals(false, errDump.contains("is still open for writes."));

http://git-wip-us.apache.org/repos/asf/hive/blob/ea3c79e4/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index f81752f..0587e80 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -778,13 +778,13 @@ public class TestCompactor {
       Path resultDelta = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000001_0000006")) {
+        if (names[i].equals("delta_0000001_0000004")) {
           resultDelta = stat[i].getPath();
         }
       }
       Arrays.sort(names);
       String[] expected = new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000006", "delta_0000003_0000004", 
"delta_0000005_0000006"};
+          "delta_0000001_0000004", "delta_0000003_0000004"};
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + 
Arrays.toString(names));
       }
@@ -844,11 +844,11 @@ public class TestCompactor {
         Assert.fail("majorCompactAfterAbort FileStatus[] stat " + 
Arrays.toString(stat));
       }
       if (1 != stat.length) {
-        Assert.fail("Expecting 1 file \"base_0000006\" and found " + 
stat.length + " files " + Arrays.toString(stat));
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + 
stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      if (!name.equals("base_0000006")) {
-        Assert.fail("majorCompactAfterAbort name " + name + " not equals to 
base_0000006");
+      if (!name.equals("base_0000004")) {
+        Assert.fail("majorCompactAfterAbort name " + name + " not equals to 
base_0000004");
       }
       checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, 
columnTypesProperty, 0, 1L, 4L);
     } finally {

Reply via email to