Repository: hive Updated Branches: refs/heads/master cbe3228c2 -> 7765e90aa
HIVE-20701: Allow HiveStreaming to receive a key value to commit atomically together with the transaction (Jaume M reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7765e90a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7765e90a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7765e90a Branch: refs/heads/master Commit: 7765e90aad44747860b3c1adbe8a4857d864912d Parents: cbe3228 Author: Jaume Marhuenda <jaumemarhue...@gmail.com> Authored: Mon Oct 22 14:18:20 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon Oct 22 14:18:49 2018 -0700 ---------------------------------------------------------------------- .../streaming/AbstractStreamingTransaction.java | 6 ++- .../hive/streaming/HiveStreamingConnection.java | 13 +++++-- .../hive/streaming/StreamingConnection.java | 23 ++++++++--- .../hive/streaming/StreamingTransaction.java | 14 ++++++- .../apache/hive/streaming/TransactionBatch.java | 26 +++++++++++-- .../streaming/UnManagedSingleTransaction.java | 3 +- .../apache/hive/streaming/TestStreaming.java | 41 +++++++++++++++++++- 7 files changed, 109 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java index a99fdba..6ab3ffe 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import java.io.InputStream; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -151,6 +152,9 @@ abstract class AbstractStreamingTransaction } public void commit() throws StreamingException { - commitWithPartitions(null); + commit(null); + } + public void commit(Set<String> partitions) throws StreamingException { + commit(partitions, null, null); } } http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f79b844..74fc531 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -146,6 +146,7 @@ public class HiveStreamingConnection implements StreamingConnection { private boolean manageTransactions; private int countTransactions = 0; private Set<String> partitions; + private Long tableId; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -574,12 +575,18 @@ public class HiveStreamingConnection implements StreamingConnection { @Override public void commitTransaction() throws StreamingException { - commitTransactionWithPartition(null); + commitTransaction(null); } @Override - public void commitTransactionWithPartition(Set<String> partitions) + public void commitTransaction(Set<String> partitions) throws StreamingException { + commitTransaction(partitions, null, null); + } + + @Override + public void commitTransaction(Set<String> partitions, String key, + String value) throws StreamingException { checkState(); Set<String> createdPartitions = new HashSet<>(); @@ -598,7 +605,7 @@ public class HiveStreamingConnection implements StreamingConnection { connectionStats.incrementTotalPartitions(partitions.size()); } - currentTransactionBatch.commitWithPartitions(createdPartitions); + currentTransactionBatch.commit(createdPartitions, key, value); this.partitions.addAll( currentTransactionBatch.getPartitions()); connectionStats.incrementCreatedPartitions(createdPartitions.size()); http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index 92016e5..ba4c6a5 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -66,13 +66,26 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler { void commitTransaction() throws StreamingException; /** - * Commit a transaction to make the writes visible for readers. Include - * other partitions that may have been added independently. - * + * Commits the transaction together with a key value atomically. * @param partitions - extra partitions to commit. - * @throws StreamingException - if there are errors when committing the open transaction. + * @param key - key to commit. + * @param value - value to commit. + * @throws StreamingException - if there are errors when committing + * the open transaction. */ - default void commitTransactionWithPartition(@Nullable Set<String> partitions) + default void commitTransaction(@Nullable Set<String> partitions, + @Nullable String key, @Nullable String value) throws StreamingException { + throw new UnsupportedOperationException(); + } + + /** + * Commit a transaction to make the writes visible for readers. Include + * other partitions that may have been added independently. + * + * @param partitions - extra partitions to commit. + * @throws StreamingException - if there are errors when committing the open transaction. + */ + default void commitTransaction(@Nullable Set<String> partitions) throws StreamingException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java index 83b2f15..c0ee034 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java @@ -19,6 +19,8 @@ package org.apache.hive.streaming; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; + +import javax.annotation.Nullable; import java.io.InputStream; import java.util.List; import java.util.Set; @@ -45,7 +47,17 @@ public interface StreamingTransaction { * @param partitions to commit. * @throws StreamingException */ - void commitWithPartitions(Set<String> partitions) throws StreamingException; + void commit(@Nullable Set<String> partitions) throws StreamingException; + + /** + * Commits atomically together with a key and a value. + * @param partitions to commit. + * @param key to commit. + * @param value to commit. + * @throws StreamingException + */ + void commit(@Nullable Set<String> partitions, @Nullable String key, + @Nullable String value) throws StreamingException; /** * Abort a transaction. http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java index dabbe21..a625759 100644 --- a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java +++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -80,6 +80,11 @@ public class TransactionBatch extends AbstractStreamingTransaction { private String agentInfo; private int numTxns; + + /** + * Id of the table from the streaming connection. + */ + private final long tableId; /** * Tracks the state of each transaction. */ @@ -107,6 +112,7 @@ public class TransactionBatch extends AbstractStreamingTransaction { this.recordWriter = conn.getRecordWriter(); this.agentInfo = conn.getAgentInfo(); this.numTxns = conn.getTransactionBatchSize(); + this.tableId = conn.getTable().getTTable().getId(); setupHeartBeatThread(); @@ -244,19 +250,26 @@ public class TransactionBatch extends AbstractStreamingTransaction { } } - public void commitWithPartitions(Set<String> partitions) throws StreamingException { + public void commit(Set<String> partitions, String key, String value) + throws StreamingException { checkIsClosed(); boolean success = false; try { - commitImpl(partitions); + commitImpl(partitions, key, value); success = true; } finally { markDead(success); } } - private void commitImpl(Set<String> partitions) throws StreamingException { + private void commitImpl(Set<String> partitions, String key, String value) + throws StreamingException { try { + if ((key == null && value != null) || (key != null && value == null)) { + throw new StreamingException(String.format( + "If key is set, the value should be as well and vice versa," + + " key, value = %s, %s", key, value)); + } recordWriter.flush(); TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); if (conn.isDynamicPartitioning()) { @@ -274,7 +287,12 @@ public class TransactionBatch extends AbstractStreamingTransaction { } transactionLock.lock(); try { - conn.getMSC().commitTxn(txnToWriteId.getTxnId()); + if (key != null) { + conn.getMSC().commitTxnWithKeyValue(txnToWriteId.getTxnId(), + tableId, key, value); + } else { + conn.getMSC().commitTxn(txnToWriteId.getTxnId()); + } // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. if (currentTxnIndex + 1 < txnToWriteIds.size()) { http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java index 68b0906..75779d5 100644 --- a/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java +++ b/streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java @@ -69,7 +69,8 @@ public class UnManagedSingleTransaction extends AbstractStreamingTransaction { } @Override - public void commitWithPartitions(Set<String> partitions) throws StreamingException { + public void commit(Set<String> partitions, String key, String value) + throws StreamingException { checkIsClosed(); boolean success = false; try { http://git-wip-us.apache.org/repos/asf/hive/blob/7765e90a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 1c9e43f..50433b6 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -439,6 +439,43 @@ public class TestStreaming { } @Test + public void testCommitWithKeyValue() throws Exception { + queryTable(driver, "drop table if exists default.keyvalue"); + queryTable(driver, "create table default.keyvalue (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); + queryTable(driver, "insert into default.keyvalue values('foo','bar')"); + queryTable(driver, "ALTER TABLE default.keyvalue SET TBLPROPERTIES('_metamykey' = 'myvalue')"); + List<String> rs = queryTable(driver, "select * from default.keyvalue"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("foo\tbar", rs.get(0)); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("keyvalue") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(2) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + connection.beginTransaction(); + connection.write("a1,b2".getBytes()); + connection.write("a3,b4".getBytes()); + connection.commitTransaction(null, "_metamykey", "myvalue"); + connection.close(); + + rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.keyvalue order by ROW__ID"); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("keyvalue/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("keyvalue/delta_0000002_0000003/bucket_00000")); + + rs = queryTable(driver, "SHOW TBLPROPERTIES default.keyvalue('_metamykey')"); + Assert.assertEquals(rs.get(0), "_metamykey\tmyvalue", rs.get(0)); + } + + @Test public void testConnectionWithWriteId() throws Exception { queryTable(driver, "drop table if exists default.writeidconnection"); queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " + @@ -1139,7 +1176,7 @@ public class TestStreaming { Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised"); } catch (NoSuchObjectException e) {} - transactionConnection.commitTransactionWithPartition(partitions); + transactionConnection.commitTransaction(partitions); // Ensure partition is present Partition p = msClient.getPartition(dbName, tblName, newPartVals); @@ -1217,7 +1254,7 @@ public class TestStreaming { partitionsOne.addAll(partitionsTwo); Set<String> allPartitions = partitionsOne; - transactionConnection.commitTransactionWithPartition(allPartitions); + transactionConnection.commitTransaction(allPartitions); // Ensure partition is present for (String partition : allPartitions) {