HIVE-14114 Ensure RecordWriter in streaming API is using the same UserGroupInformation as StreamingConnection (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cec61d94 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cec61d94 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cec61d94 Branch: refs/heads/branch-2.1 Commit: cec61d945e7949f3585e88d777f0687664fcb85e Parents: c5a9b0c Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Jul 8 18:34:08 2016 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Jul 8 18:34:08 2016 -0700 ---------------------------------------------------------------------- .../streaming/AbstractRecordWriter.java | 61 +++++++++++++------ .../streaming/DelimitedInputWriter.java | 47 +++++++++++---- .../hive/hcatalog/streaming/HiveEndPoint.java | 10 +++- .../hcatalog/streaming/StreamingConnection.java | 6 ++ .../hcatalog/streaming/StrictJsonWriter.java | 26 +++++--- .../hive/hcatalog/streaming/TestStreaming.java | 63 +++++++++++--------- 6 files changed, 149 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 0c6b9ea..974c6b8 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import org.apache.thrift.TException; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -68,34 +70,59 @@ public abstract class AbstractRecordWriter implements RecordWriter { private Long curBatchMinTxnId; private Long curBatchMaxTxnId; + private static final class TableWriterPair { + private final Table tbl; + private final Path partitionPath; + TableWriterPair(Table t, Path p) { + tbl = t; + partitionPath = p; + } + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)} + */ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) - throws ConnectionError, StreamingException { - this.endPoint = endPoint; + throws ConnectionError, StreamingException { + this(endPoint, conf, null); + } + protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn) + throws StreamingException { + this.endPoint = endPoint2; this.conf = conf!=null ? conf : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri); try { msClient = HCatUtil.getHiveMetastoreClient(this.conf); - this.tbl = msClient.getTable(endPoint.database, endPoint.table); - this.partitionPath = getPathForEndPoint(msClient, endPoint); + UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null; + if (ugi == null) { + this.tbl = msClient.getTable(endPoint.database, endPoint.table); + this.partitionPath = getPathForEndPoint(msClient, endPoint); + } else { + TableWriterPair twp = ugi.doAs( + new PrivilegedExceptionAction<TableWriterPair>() { + @Override + public TableWriterPair run() throws Exception { + return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table), + getPathForEndPoint(msClient, endPoint)); + } + }); + this.tbl = twp.tbl; + this.partitionPath = twp.partitionPath; + } this.totalBuckets = tbl.getSd().getNumBuckets(); - if(totalBuckets <= 0) { + if (totalBuckets <= 0) { throw new StreamingException("Cannot stream to table that has not been bucketed : " - + endPoint); + + endPoint); } - this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ; + this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()); this.bucketFieldData = new Object[bucketIds.size()]; String outFormatName = this.tbl.getSd().getOutputFormat(); - outf = (AcidOutputFormat<?,?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); + outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); bucketFieldData = new Object[bucketIds.size()]; - } catch (MetaException e) { - throw new ConnectionError(endPoint, e); - } catch (NoSuchObjectException e) { - throw new ConnectionError(endPoint, e); - } catch (TException e) { - throw new StreamingException(e.getMessage(), e); - } catch (ClassNotFoundException e) { - throw new StreamingException(e.getMessage(), e); - } catch (IOException e) { + } catch(InterruptedException e) { + throw new StreamingException(endPoint2.toString(), e); + } catch (MetaException | NoSuchObjectException e) { + throw new ConnectionError(endPoint2, e); + } catch (TException | ClassNotFoundException | IOException e) { throw new StreamingException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 394cc54..7ab2fc6 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -20,6 +20,7 @@ package org.apache.hive.hcatalog.streaming; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -73,12 +74,11 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @throws InvalidColumn any element in colNamesForFields refers to a non existing column */ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, - HiveEndPoint endPoint) - throws ClassNotFoundException, ConnectionError, SerializationError, - InvalidColumn, StreamingException { - this(colNamesForFields, delimiter, endPoint, null); + HiveEndPoint endPoint, StreamingConnection conn) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, null, conn); } - /** Constructor. Uses default separator of the LazySimpleSerde * @param colNamesForFields Column name assignment for input fields. nulls or empty * strings in the array indicates the fields to be skipped @@ -92,13 +92,12 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @throws InvalidColumn any element in colNamesForFields refers to a non existing column */ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, - HiveEndPoint endPoint, HiveConf conf) + HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) throws ClassNotFoundException, ConnectionError, SerializationError, InvalidColumn, StreamingException { this(colNamesForFields, delimiter, endPoint, conf, - (char) LazySerDeParameters.DefaultSeparators[0]); + (char) LazySerDeParameters.DefaultSeparators[0], conn); } - /** * Constructor. Allows overriding separator of the LazySimpleSerde * @param colNamesForFields Column name assignment for input fields @@ -108,6 +107,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @param serdeSeparator separator used when encoding data that is fed into the * LazySimpleSerde. Ensure this separator does not occur * in the field data + * @param conn connection this Writer is to be used with * @throws ConnectionError Problem talking to Hive * @throws ClassNotFoundException Serde class not found * @throws SerializationError Serde initialization/interaction failed @@ -115,10 +115,10 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @throws InvalidColumn any element in colNamesForFields refers to a non existing column */ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, - HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn) throws ClassNotFoundException, ConnectionError, SerializationError, InvalidColumn, StreamingException { - super(endPoint, conf); + super(endPoint, conf, conn); this.tableColumns = getCols(tbl); this.serdeSeparator = serdeSeparator; this.delimiter = delimiter; @@ -143,6 +143,33 @@ public class DelimitedInputWriter extends AbstractRecordWriter { bucketStructFields[i] = allFields.get(bucketIds.get(i)); } } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, null, null); + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, + (char) LazySerDeParameters.DefaultSeparators[0], null); + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + throws ClassNotFoundException, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null); + } private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) { return !( delimiter.equals(String.valueOf(getSerdeSeparator())) http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 452cb15..1a7cfae 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -97,7 +97,7 @@ public class HiveEndPoint { /** - * @deprecated Use {@link #newConnection(boolean, String)} + * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, String)} */ public StreamingConnection newConnection(final boolean createPartIfNotExists) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed @@ -105,7 +105,7 @@ public class HiveEndPoint { return newConnection(createPartIfNotExists, null, null, null); } /** - * @deprecated Use {@link #newConnection(boolean, HiveConf, String)} + * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, String)} */ public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed @@ -113,7 +113,7 @@ public class HiveEndPoint { return newConnection(createPartIfNotExists, conf, null, null); } /** - * @deprecated Use {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)} + * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)} */ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, final UserGroupInformation authenticatedUser) @@ -395,6 +395,10 @@ public class HiveEndPoint { } } + @Override + public UserGroupInformation getUserGroupInformation() { + return ugi; + } /** * Acquires a new batch of transactions from Hive. http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java index 25acff0..8785a21 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java @@ -18,6 +18,8 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.security.UserGroupInformation; + /** * Represents a connection to a HiveEndPoint. Used to acquire transaction batches. */ @@ -46,4 +48,8 @@ public interface StreamingConnection { */ public void close(); + /** + * @return UserGroupInformation associated with this connection or {@code null} if there is none + */ + UserGroupInformation getUserGroupInformation(); } http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index db73d6b..1facad1 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -46,28 +46,40 @@ public class StrictJsonWriter extends AbstractRecordWriter { private final StructField[] bucketStructFields; /** - * + * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)} + */ + public StrictJsonWriter(HiveEndPoint endPoint) + throws ConnectionError, SerializationError, StreamingException { + this(endPoint, null, null); + } + + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)} + */ + public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException { + this(endPoint, conf, null); + } + /** * @param endPoint the end point to write to * @throws ConnectionError * @throws SerializationError * @throws StreamingException */ - public StrictJsonWriter(HiveEndPoint endPoint) + public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn) throws ConnectionError, SerializationError, StreamingException { - this(endPoint, null); + this(endPoint, null, conn); } - /** - * * @param endPoint the end point to write to * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @param conn connection this Writer is to be used with * @throws ConnectionError * @throws SerializationError * @throws StreamingException */ - public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) + public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) throws ConnectionError, SerializationError, StreamingException { - super(endPoint, conf); + super(endPoint, conf, conn); this.serde = createSerde(tbl, conf); // get ObjInspectors for entire record and bucketed cols try { http://git-wip-us.apache.org/repos/asf/hive/blob/cec61d94/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 84e559d..dedfe3f 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.orc.impl.OrcAcidUtils; import org.apache.orc.tools.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; @@ -300,11 +302,11 @@ public class TestStreaming { List<String> partitionVals = new ArrayList<String>(); partitionVals.add("2015"); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", - "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -562,8 +564,8 @@ public class TestStreaming { // 1) to partitioned table HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -641,8 +643,8 @@ public class TestStreaming { @Test public void testHeartbeat() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -670,8 +672,8 @@ public class TestStreaming { // 1) to partitioned table HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -697,28 +699,35 @@ public class TestStreaming { @Test public void testTransactionBatchCommit_Delimited() throws Exception { + testTransactionBatchCommit_Delimited(null); + } + @Test + public void testTransactionBatchCommit_DelimitedUGI() throws Exception { + testTransactionBatchCommit_Delimited(Utils.getUGI()); + } + private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + partitionVals); + StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); // 2nd Txn txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -727,11 +736,11 @@ public class TestStreaming { txnBatch.commit(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", - "{2, Welcome to streaming}"); + "{2, Welcome to streaming}"); txnBatch.close(); Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); connection.close(); @@ -739,19 +748,19 @@ public class TestStreaming { // To Unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection); // 1st Txn txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); connection.close(); } @@ -759,8 +768,8 @@ public class TestStreaming { public void testTransactionBatchCommit_Json() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - StrictJsonWriter writer = new StrictJsonWriter(endPt); StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + StrictJsonWriter writer = new StrictJsonWriter(endPt, connection); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -844,8 +853,8 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -872,8 +881,8 @@ public class TestStreaming { String agentInfo = "UT_" + Thread.currentThread().getName(); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -1173,8 +1182,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1186,8 +1195,8 @@ public class TestStreaming { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); - DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); StreamingConnection connection2 = endPt2.newConnection(false, agentInfo); + DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2, connection); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1250,8 +1259,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1322,8 +1331,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); // we need side file for this test, so we create 2 txn batch and test with only one TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); @@ -1448,8 +1457,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1670,9 +1679,9 @@ public class TestStreaming { runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null); - DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt); - FaultyWriter writer = new FaultyWriter(innerWriter); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt, connection); + FaultyWriter writer = new FaultyWriter(innerWriter); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.close();