This is an automated email from the ASF dual-hosted git repository. prasanthj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new bc7b38e HIVE-21146 Enforce TransactionBatch size=1 for blob stores (#797) bc7b38e is described below commit bc7b38e5fdb16d28319da407a80d78f4e8124d95 Author: dlavati <dlav...@users.noreply.github.com> AuthorDate: Thu Dec 5 11:51:23 2019 +0100 HIVE-21146 Enforce TransactionBatch size=1 for blob stores (#797) * HIVE-21146 Enforce TransactionBatch size=1 for blob stores Change-Id: Ia5f94c34a044c2990e95204de03b661d162874c7 * Apply _ prefix to tmp verification file * Rely on /tmp instead --- .../hive/streaming/HiveStreamingConnection.java | 25 ++++++++++++++++++ .../org/apache/hive/streaming/TestStreaming.java | 30 +++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f4e71f9..27dc6f2 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -32,8 +32,11 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -520,6 +523,28 @@ public class HiveStreamingConnection implements StreamingConnection { LOG.error(errMsg); throw new ConnectionError(errMsg); } + + // batch size is only used for managed transactions, not for unmanaged single transactions + if (transactionBatchSize > 1) { + try (FileSystem fs = tableObject.getDataLocation().getFileSystem(conf)) { + if (BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) { + // currently not all filesystems implement StreamCapabilities, while FSDataOutputStream does + Path path = new Path("/tmp", "_tmp_stream_verify_" + UUID.randomUUID().toString()); + try(FSDataOutputStream out = fs.create(path, false)){ + if (!out.hasCapability(StreamCapabilities.HFLUSH)) { + throw new ConnectionError( + "The backing filesystem only supports transaction batch sizes of 1, but " + transactionBatchSize + + " was requested."); + } + fs.deleteOnExit(path); + } catch (IOException e){ + throw new ConnectionError("Could not create path for database", e); + } + } + } catch (IOException e) { + throw new ConnectionError("Could not retrieve FileSystem of table", e); + } + } } private void beginNextTransaction() throws StreamingException { diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 055672f..58b3ae2 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -114,7 +114,6 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TestStreaming { private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class); @@ -1314,6 +1313,35 @@ public class TestStreaming { connection.close(); } + @Test + public void testTransactionBatchSizeValidation() throws Exception { + final String schemes = conf.get(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname); + // the output stream of this FS doesn't support hflush, so the below test will fail + conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, "raw"); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + try { + HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withTransactionBatchSize(2) + .withHiveConf(conf) + .connect(); + + Assert.fail(); + } catch (ConnectionError e) { + Assert.assertTrue("Expected connection error due to batch sizes", + e.getMessage().contains("only supports transaction batch")); + } finally { + conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, schemes); + } + } + /** * check that transactions that have not heartbeated and timedout get properly aborted *