Repository: hive
Updated Branches:
  refs/heads/branch-3 65f486fb1 -> 074ec7ef1


HIVE-19209: Streaming ingest record writers should accept input stream 
(Prasanth Jayachandran reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/074ec7ef
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/074ec7ef
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/074ec7ef

Branch: refs/heads/branch-3
Commit: 074ec7ef117cf618747bbe9d4442dfc343c15fea
Parents: 65f486f
Author: Prasanth Jayachandran <prasan...@apache.org>
Authored: Sat May 5 19:26:01 2018 -0700
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Sat May 5 19:34:44 2018 -0700

----------------------------------------------------------------------
 .../hive/streaming/AbstractRecordWriter.java    |  21 ++++
 .../hive/streaming/HiveStreamingConnection.java |  46 +++++--
 .../org/apache/hive/streaming/RecordWriter.java |  21 +++-
 .../hive/streaming/StreamingConnection.java     |  10 ++
 .../streaming/StrictDelimitedInputWriter.java   |   9 ++
 .../apache/hive/streaming/StrictJsonWriter.java |  13 +-
 .../hive/streaming/StrictRegexWriter.java       |   7 ++
 .../apache/hive/streaming/TestStreaming.java    | 124 +++++++++++++++++++
 8 files changed, 235 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index a979df0..685e0cc 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -20,6 +20,7 @@ package org.apache.hive.streaming;
 
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryUsage;
@@ -30,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.Scanner;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -60,6 +62,7 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractRecordWriter implements RecordWriter {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
 
+  private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]";
   protected HiveConf conf;
   private StreamingConnection conn;
   protected Table table;
@@ -87,6 +90,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   private AcidOutputFormat<?, ?> acidOutputFormat;
   private Long curBatchMinWriteId;
   private Long curBatchMaxWriteId;
+  private final String lineDelimiter;
   private HeapMemoryMonitor heapMemoryMonitor;
   // if low memory canary is set and if records after set canary exceeds 
threshold, trigger a flush.
   // This is to avoid getting notified of low memory too often and flushing 
too often.
@@ -96,6 +100,11 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   private float memoryUsageThreshold;
   private long ingestSizeThreshold;
 
+  public AbstractRecordWriter(final String lineDelimiter) {
+    this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ?
+      DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter;
+  }
+
   private static class OrcMemoryPressureMonitor implements 
HeapMemoryMonitor.Listener {
     private static final Logger LOG = 
LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName());
     private final AtomicBoolean lowMemoryCanary;
@@ -367,6 +376,15 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   }
 
   @Override
+  public void write(final long writeId, final InputStream inputStream) throws 
StreamingException {
+    try (Scanner scanner = new 
Scanner(inputStream).useDelimiter(lineDelimiter)) {
+      while (scanner.hasNext()) {
+        write(writeId, scanner.next().getBytes());
+      }
+    }
+  }
+
+  @Override
   public void write(final long writeId, final byte[] record) throws 
StreamingException {
     checkAutoFlush();
     ingestSizeBytes += record.length;
@@ -375,6 +393,9 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
       int bucket = getBucket(encodedRow);
       List<String> partitionValues = getPartitionValues(encodedRow);
       getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow);
+      // ingest size bytes gets resetted on flush() whereas connection stats 
is not
+      conn.getConnectionStats().incrementRecordsWritten();
+      conn.getConnectionStats().incrementRecordsSize(record.length);
     } catch (IOException e) {
       throw new StreamingIOFailure("Error writing record in transaction write 
id ("
         + writeId + ")", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index c4da7af..85887b2 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -19,6 +19,7 @@
 package org.apache.hive.streaming;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -89,7 +90,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
  *                                      .withFieldDelimiter(',')
  *                                      .build();
- *
  * // create and open streaming connection (default.src table has to exist 
already)
  * StreamingConnection connection = HiveStreamingConnection.newBuilder()
  *                                    .withDatabase("default")
@@ -98,19 +98,16 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder;
  *                                    .withRecordWriter(writer)
  *                                    .withHiveConf(hiveConf)
  *                                    .connect();
- *
  * // begin a transaction, write records and commit 1st transaction
  * connection.beginTransaction();
  * connection.write("key1,val1".getBytes());
  * connection.write("key2,val2".getBytes());
  * connection.commitTransaction();
- *
  * // begin another transaction, write more records and commit 2nd transaction
  * connection.beginTransaction();
  * connection.write("key3,val3".getBytes());
  * connection.write("key4,val4".getBytes());
  * connection.commitTransaction();
- *
  * // close the streaming connection
  * connection.close();
  * }
@@ -497,14 +494,6 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   }
 
   @Override
-  public void write(final byte[] record) throws StreamingException {
-    checkState();
-    currentTransactionBatch.write(record);
-    connectionStats.incrementRecordsWritten();
-    connectionStats.incrementRecordsSize(record.length);
-  }
-
-  @Override
   public void beginTransaction() throws StreamingException {
     checkClosedState();
     beginNextTransaction();
@@ -525,6 +514,21 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   }
 
   @Override
+  public void write(final byte[] record) throws StreamingException {
+    checkState();
+    currentTransactionBatch.write(record);
+  }
+
+  @Override
+  public void write(final InputStream inputStream) throws StreamingException {
+    checkState();
+    currentTransactionBatch.write(inputStream);
+  }
+
+  /**
+   * Close connection
+   */
+  @Override
   public void close() {
     if (isConnectionClosed.get()) {
       return;
@@ -785,6 +789,24 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       }
     }
 
+    public void write(final InputStream inputStream) throws StreamingException 
{
+      checkIsClosed();
+      boolean success = false;
+      try {
+        recordWriter.write(getCurrentWriteId(), inputStream);
+        success = true;
+      } catch (SerializationError ex) {
+        //this exception indicates that a {@code record} could not be parsed 
and the
+        //caller can decide whether to drop it or send it to dead letter queue.
+        //rolling back the txn and retrying won'table help since the tuple 
will be exactly the same
+        //when it's replayed.
+        success = true;
+        throw ex;
+      } finally {
+        markDead(success);
+      }
+    }
+
     private void checkIsClosed() throws StreamingException {
       if (isTxnClosed.get()) {
         throw new StreamingException("Transaction" + toString() + " is 
closed()");

http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
index 4d25924..d9c4455 100644
--- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -19,6 +19,8 @@
 package org.apache.hive.streaming;
 
 
+import java.io.InputStream;
+
 import java.util.Set;
 
 public interface RecordWriter {
@@ -34,15 +36,28 @@ public interface RecordWriter {
   void init(StreamingConnection connection, long minWriteId, long maxWriteID) 
throws StreamingException;
 
   /**
-   * Writes using a hive RecordUpdater
+   * Writes using a hive RecordUpdater.
    *
-   * @param writeId the write ID of the table mapping to Txn in which the 
write occurs
-   * @param record  the record to be written
+   * @param writeId - the write ID of the table mapping to Txn in which the 
write occurs
+   * @param record  - the record to be written
+   * @throws StreamingException - thrown when write fails
    */
   void write(long writeId, byte[] record) throws StreamingException;
 
   /**
+   * Writes using a hive RecordUpdater. The specified input stream will be 
automatically closed
+   * by the API after reading all the records out of it.
+   *
+   * @param writeId     - the write ID of the table mapping to Txn in which 
the write occurs
+   * @param inputStream - the record to be written
+   * @throws StreamingException - thrown when write fails
+   */
+  void write(long writeId, InputStream inputStream) throws StreamingException;
+
+  /**
    * Flush records from buffer. Invoked by TransactionBatch.commitTransaction()
+   *
+   * @throws StreamingException - thrown when flush fails
    */
   void flush() throws StreamingException;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index 5396d76..fbe00db 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -18,6 +18,8 @@
 
 package org.apache.hive.streaming;
 
+import java.io.InputStream;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 
 public interface StreamingConnection extends ConnectionInfo, PartitionHandler {
@@ -44,6 +46,14 @@ public interface StreamingConnection extends ConnectionInfo, 
PartitionHandler {
   void write(byte[] record) throws StreamingException;
 
   /**
+   * Write record using RecordWriter.
+   *
+   * @param inputStream - input stream of records
+   * @throws StreamingException - if there are errors when writing
+   */
+  void write(InputStream inputStream) throws StreamingException;
+
+  /**
    * Commit a transaction to make the writes visible for readers.
    *
    * @throws StreamingException - if there are errors when committing the open 
transaction

http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
index 56f59fd..13de1d4 100644
--- 
a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
+++ 
b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
@@ -19,7 +19,9 @@
 package org.apache.hive.streaming;
 
 
+import java.io.InputStream;
 import java.util.Properties;
+import java.util.Scanner;
 
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -44,6 +46,7 @@ public class StrictDelimitedInputWriter extends 
AbstractRecordWriter {
   private LazySimpleSerDe serde;
 
   private StrictDelimitedInputWriter(Builder builder) {
+    super(builder.lineDelimiter);
     this.fieldDelimiter = builder.fieldDelimiter;
     this.collectionDelimiter = builder.collectionDelimiter;
     this.mapKeyDelimiter = builder.mapKeyDelimiter;
@@ -57,6 +60,7 @@ public class StrictDelimitedInputWriter extends 
AbstractRecordWriter {
     private char fieldDelimiter = (char) 
LazySerDeParameters.DefaultSeparators[0];
     private char collectionDelimiter = (char) 
LazySerDeParameters.DefaultSeparators[1];
     private char mapKeyDelimiter = (char) 
LazySerDeParameters.DefaultSeparators[2];
+    private String lineDelimiter;
 
     public Builder withFieldDelimiter(final char fieldDelimiter) {
       this.fieldDelimiter = fieldDelimiter;
@@ -73,6 +77,11 @@ public class StrictDelimitedInputWriter extends 
AbstractRecordWriter {
       return this;
     }
 
+    public Builder withLineDelimiterPattern(final String lineDelimiter) {
+      this.lineDelimiter = lineDelimiter;
+      return this;
+    }
+
     public StrictDelimitedInputWriter build() {
       return new StrictDelimitedInputWriter(this);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index 2c54eef..0f9b652 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -34,13 +34,24 @@ import org.apache.hadoop.io.Text;
 public class StrictJsonWriter extends AbstractRecordWriter {
   private JsonSerDe serde;
 
+  public StrictJsonWriter(final Builder builder) {
+    super(builder.lineDelimiter);
+  }
+
   public static Builder newBuilder() {
     return new Builder();
   }
 
   public static class Builder {
+    private String lineDelimiter;
+
+    public Builder withLineDelimiterPattern(final String lineDelimiter) {
+      this.lineDelimiter = lineDelimiter;
+      return this;
+    }
+
     public StrictJsonWriter build() {
-      return new StrictJsonWriter();
+      return new StrictJsonWriter(this);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
index ba2609f..3651fa1 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -38,6 +38,7 @@ public class StrictRegexWriter extends AbstractRecordWriter {
   private RegexSerDe serde;
 
   private StrictRegexWriter(final Builder builder) {
+    super(builder.lineDelimiter);
     this.regex = builder.regex;
   }
 
@@ -47,12 +48,18 @@ public class StrictRegexWriter extends AbstractRecordWriter 
{
 
   public static class Builder {
     private String regex;
+    private String lineDelimiter;
 
     public Builder withRegex(final String regex) {
       this.regex = regex;
       return this;
     }
 
+    public Builder withLineDelimiterPattern(final String lineDelimiter) {
+      this.lineDelimiter = lineDelimiter;
+      return this;
+    }
+
     public StrictRegexWriter build() {
       return new StrictRegexWriter(this);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java 
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index a6fdd66..042fdbe 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -20,11 +20,13 @@ package org.apache.hive.streaming;
 
 import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -468,6 +470,60 @@ public class TestStreaming {
   }
 
   @Test
+  public void testAllTypesDelimitedWriterInputStream() throws Exception {
+    queryTable(driver, "drop table if exists default.alltypes");
+    queryTable(driver,
+      "create table if not exists default.alltypes ( bo boolean, ti tinyint, 
si smallint, i int, bi bigint, " +
+        "f float, d double, de decimal(10,3), ts timestamp, da date, s string, 
c char(5), vc varchar(5), " +
+        "m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " 
+
+        "stored as orc TBLPROPERTIES('transactional'='true')");
+    StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter('|')
+      .withCollectionDelimiter(',')
+      .withMapKeyDelimiter(':')
+      .withLineDelimiterPattern("\n")
+      .build();
+    StreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase("default")
+      .withTable("alltypes")
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withTransactionBatchSize(2)
+      .withRecordWriter(wr)
+      .withHiveConf(conf)
+      .connect();
+
+    String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " +
+      "15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo";
+    String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 
15:59:58.174|1971-01-01|abcd|world|world|" +
+      "k4:v4|200,300|20,bar";
+    String allRows = row1 + "\n" + row2 + "\n";
+    ByteArrayInputStream bais = new ByteArrayInputStream(allRows.getBytes());
+    connection.beginTransaction();
+    connection.write(bais);
+    connection.commitTransaction();
+    connection.close();
+    bais.close();
+
+    List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, 
f, d, de, ts, da, s, c, vc, m, l, st," +
+      " INPUT__FILE__NAME from default.alltypes order by ROW__ID");
+    Assert.assertEquals(2, rs.size());
+    String gotRow1 = rs.get(0);
+    String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
+      "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 
15:59:58.174\t1970-01-01\tstring" +
+      "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
+    String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000";
+    String gotRow2 = rs.get(1);
+    String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," +
+      "\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 
15:59:58.174\t1971-01-01\tabcd" +
+      "\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}";
+    String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000";
+    Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1));
+    Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1));
+    Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2));
+    Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2));
+  }
+
+  @Test
   public void testAutoRollTransactionBatch() throws Exception {
     queryTable(driver, "drop table if exists default.streamingnobuckets");
     queryTable(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc " +
@@ -1226,6 +1282,37 @@ public class TestStreaming {
   }
 
   @Test
+  public void testRegexInputStream() throws Exception {
+    String regex = "([^,]*),(.*)";
+    StrictRegexWriter writer = StrictRegexWriter.newBuilder()
+      // if unspecified, default one or [\r\n] will be used for line break
+      .withRegex(regex)
+      .build();
+    StreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withHiveConf(conf)
+      .withRecordWriter(writer)
+      .connect();
+
+    String rows = "1,foo\r2,bar\r3,baz";
+    ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes());
+    connection.beginTransaction();
+    connection.write(bais);
+    connection.commitTransaction();
+    bais.close();
+    connection.close();
+
+    List<String> rs = queryTable(driver, "select * from " + dbName + "." + 
tblName);
+    Assert.assertEquals(3, rs.size());
+    Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0));
+    Assert.assertEquals("2\tbar\tAsia\tIndia", rs.get(1));
+    Assert.assertEquals("3\tbaz\tAsia\tIndia", rs.get(2));
+  }
+
+  @Test
   public void testTransactionBatchCommitJson() throws Exception {
     StrictJsonWriter writer = StrictJsonWriter.newBuilder()
       .build();
@@ -1261,6 +1348,37 @@ public class TestStreaming {
   }
 
   @Test
+  public void testJsonInputStream() throws Exception {
+    StrictJsonWriter writer = StrictJsonWriter.newBuilder()
+      .withLineDelimiterPattern("\\|")
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    // 1st Txn
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, 
connection.getCurrentTransactionState());
+    String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, 
\"msg\": \"Hello world\"}|{\"id\" : 3, " +
+      "\"msg\": \"Hello world!!\"}";
+    ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes());
+    connection.write(bais);
+    connection.commitTransaction();
+    bais.close();
+    connection.close();
+    List<String> rs = queryTable(driver, "select * from " + dbName + "." + 
tblName);
+    Assert.assertEquals(3, rs.size());
+    Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0));
+    Assert.assertEquals("2\tHello world\tAsia\tIndia", rs.get(1));
+    Assert.assertEquals("3\tHello world!!\tAsia\tIndia", rs.get(2));
+  }
+
+  @Test
   public void testRemainingTransactions() throws Exception {
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')
@@ -2797,6 +2915,12 @@ public class TestStreaming {
     }
 
     @Override
+    public void write(final long writeId, final InputStream inputStream) 
throws StreamingException {
+      delegate.write(writeId, inputStream);
+      produceFault();
+    }
+
+    @Override
     public void flush() throws StreamingException {
       delegate.flush();
       produceFault();

Reply via email to