This is an automated email from the ASF dual-hosted git repository.

prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bc7b38e  HIVE-21146 Enforce TransactionBatch size=1 for blob stores 
(#797)
bc7b38e is described below

commit bc7b38e5fdb16d28319da407a80d78f4e8124d95
Author: dlavati <dlav...@users.noreply.github.com>
AuthorDate: Thu Dec 5 11:51:23 2019 +0100

    HIVE-21146 Enforce TransactionBatch size=1 for blob stores (#797)
    
    * HIVE-21146 Enforce TransactionBatch size=1 for blob stores
    
    Change-Id: Ia5f94c34a044c2990e95204de03b661d162874c7
    
    * Apply _ prefix to tmp verification file
    
    * Rely on /tmp instead
---
 .../hive/streaming/HiveStreamingConnection.java    | 25 ++++++++++++++++++
 .../org/apache/hive/streaming/TestStreaming.java   | 30 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index f4e71f9..27dc6f2 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -32,8 +32,11 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -520,6 +523,28 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       LOG.error(errMsg);
       throw new ConnectionError(errMsg);
     }
+
+    // batch size is only used for managed transactions, not for unmanaged 
single transactions
+    if (transactionBatchSize > 1) {
+      try (FileSystem fs = tableObject.getDataLocation().getFileSystem(conf)) {
+        if (BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) {
+          // currently not all filesystems implement StreamCapabilities, while 
FSDataOutputStream does
+          Path path = new Path("/tmp", "_tmp_stream_verify_" + 
UUID.randomUUID().toString());
+          try(FSDataOutputStream out = fs.create(path, false)){
+            if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
+              throw new ConnectionError(
+                  "The backing filesystem only supports transaction batch 
sizes of 1, but " + transactionBatchSize
+                      + " was requested.");
+            }
+            fs.deleteOnExit(path);
+          } catch (IOException e){
+            throw new ConnectionError("Could not create path for database", e);
+          }
+        }
+      } catch (IOException e) {
+        throw new ConnectionError("Could not retrieve FileSystem of table", e);
+      }
+    }
   }
 
   private void beginNextTransaction() throws StreamingException {
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java 
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 055672f..58b3ae2 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -114,7 +114,6 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TestStreaming {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestStreaming.class);
 
@@ -1314,6 +1313,35 @@ public class TestStreaming {
     connection.close();
   }
 
+  @Test
+  public void testTransactionBatchSizeValidation() throws Exception {
+    final String schemes = 
conf.get(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);
+    // the output stream of this FS doesn't support hflush, so the below test 
will fail
+    conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, "raw");
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+
+    try {
+      HiveStreamingConnection.newBuilder()
+          .withDatabase(dbName)
+          .withTable(tblName)
+          .withAgentInfo("UT_" + Thread.currentThread().getName())
+          .withRecordWriter(writer)
+          .withTransactionBatchSize(2)
+          .withHiveConf(conf)
+          .connect();
+
+      Assert.fail();
+    } catch (ConnectionError e) {
+      Assert.assertTrue("Expected connection error due to batch sizes",
+          e.getMessage().contains("only supports transaction batch"));
+    } finally {
+      conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, schemes);
+    }
+  }
+
   /**
    * check that transactions that have not heartbeated and timedout get 
properly aborted
    *

Reply via email to