Repository: hive Updated Branches: refs/heads/branch-3 65f486fb1 -> 074ec7ef1
HIVE-19209: Streaming ingest record writers should accept input stream (Prasanth Jayachandran reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/074ec7ef Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/074ec7ef Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/074ec7ef Branch: refs/heads/branch-3 Commit: 074ec7ef117cf618747bbe9d4442dfc343c15fea Parents: 65f486f Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Sat May 5 19:26:01 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Sat May 5 19:34:44 2018 -0700 ---------------------------------------------------------------------- .../hive/streaming/AbstractRecordWriter.java | 21 ++++ .../hive/streaming/HiveStreamingConnection.java | 46 +++++-- .../org/apache/hive/streaming/RecordWriter.java | 21 +++- .../hive/streaming/StreamingConnection.java | 10 ++ .../streaming/StrictDelimitedInputWriter.java | 9 ++ .../apache/hive/streaming/StrictJsonWriter.java | 13 +- .../hive/streaming/StrictRegexWriter.java | 7 ++ .../apache/hive/streaming/TestStreaming.java | 124 +++++++++++++++++++ 8 files changed, 235 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index a979df0..685e0cc 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -20,6 +20,7 @@ package org.apache.hive.streaming; import java.io.IOException; +import java.io.InputStream; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.lang.management.MemoryUsage; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Scanner; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -60,6 +62,7 @@ import org.slf4j.LoggerFactory; public abstract class AbstractRecordWriter implements RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); + private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; protected HiveConf conf; private StreamingConnection conn; protected Table table; @@ -87,6 +90,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { private AcidOutputFormat<?, ?> acidOutputFormat; private Long curBatchMinWriteId; private Long curBatchMaxWriteId; + private final String lineDelimiter; private HeapMemoryMonitor heapMemoryMonitor; // if low memory canary is set and if records after set canary exceeds threshold, trigger a flush. // This is to avoid getting notified of low memory too often and flushing too often. @@ -96,6 +100,11 @@ public abstract class AbstractRecordWriter implements RecordWriter { private float memoryUsageThreshold; private long ingestSizeThreshold; + public AbstractRecordWriter(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ? + DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; + } + private static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName()); private final AtomicBoolean lowMemoryCanary; @@ -367,6 +376,15 @@ public abstract class AbstractRecordWriter implements RecordWriter { } @Override + public void write(final long writeId, final InputStream inputStream) throws StreamingException { + try (Scanner scanner = new Scanner(inputStream).useDelimiter(lineDelimiter)) { + while (scanner.hasNext()) { + write(writeId, scanner.next().getBytes()); + } + } + } + + @Override public void write(final long writeId, final byte[] record) throws StreamingException { checkAutoFlush(); ingestSizeBytes += record.length; @@ -375,6 +393,9 @@ public abstract class AbstractRecordWriter implements RecordWriter { int bucket = getBucket(encodedRow); List<String> partitionValues = getPartitionValues(encodedRow); getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow); + // ingest size bytes gets resetted on flush() whereas connection stats is not + conn.getConnectionStats().incrementRecordsWritten(); + conn.getConnectionStats().incrementRecordsSize(record.length); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction write id (" + writeId + ")", e); http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index c4da7af..85887b2 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -19,6 +19,7 @@ package org.apache.hive.streaming; import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -89,7 +90,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() * .withFieldDelimiter(',') * .build(); - * * // create and open streaming connection (default.src table has to exist already) * StreamingConnection connection = HiveStreamingConnection.newBuilder() * .withDatabase("default") @@ -98,19 +98,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * .withRecordWriter(writer) * .withHiveConf(hiveConf) * .connect(); - * * // begin a transaction, write records and commit 1st transaction * connection.beginTransaction(); * connection.write("key1,val1".getBytes()); * connection.write("key2,val2".getBytes()); * connection.commitTransaction(); - * * // begin another transaction, write more records and commit 2nd transaction * connection.beginTransaction(); * connection.write("key3,val3".getBytes()); * connection.write("key4,val4".getBytes()); * connection.commitTransaction(); - * * // close the streaming connection * connection.close(); * } @@ -497,14 +494,6 @@ public class HiveStreamingConnection implements StreamingConnection { } @Override - public void write(final byte[] record) throws StreamingException { - checkState(); - currentTransactionBatch.write(record); - connectionStats.incrementRecordsWritten(); - connectionStats.incrementRecordsSize(record.length); - } - - @Override public void beginTransaction() throws StreamingException { checkClosedState(); beginNextTransaction(); @@ -525,6 +514,21 @@ public class HiveStreamingConnection implements StreamingConnection { } @Override + public void write(final byte[] record) throws StreamingException { + checkState(); + currentTransactionBatch.write(record); + } + + @Override + public void write(final InputStream inputStream) throws StreamingException { + checkState(); + currentTransactionBatch.write(inputStream); + } + + /** + * Close connection + */ + @Override public void close() { if (isConnectionClosed.get()) { return; @@ -785,6 +789,24 @@ public class HiveStreamingConnection implements StreamingConnection { } } + public void write(final InputStream inputStream) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), inputStream); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won'table help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + private void checkIsClosed() throws StreamingException { if (isTxnClosed.get()) { throw new StreamingException("Transaction" + toString() + " is closed()"); http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/RecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java index 4d25924..d9c4455 100644 --- a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -19,6 +19,8 @@ package org.apache.hive.streaming; +import java.io.InputStream; + import java.util.Set; public interface RecordWriter { @@ -34,15 +36,28 @@ public interface RecordWriter { void init(StreamingConnection connection, long minWriteId, long maxWriteID) throws StreamingException; /** - * Writes using a hive RecordUpdater + * Writes using a hive RecordUpdater. * - * @param writeId the write ID of the table mapping to Txn in which the write occurs - * @param record the record to be written + * @param writeId - the write ID of the table mapping to Txn in which the write occurs + * @param record - the record to be written + * @throws StreamingException - thrown when write fails */ void write(long writeId, byte[] record) throws StreamingException; /** + * Writes using a hive RecordUpdater. The specified input stream will be automatically closed + * by the API after reading all the records out of it. + * + * @param writeId - the write ID of the table mapping to Txn in which the write occurs + * @param inputStream - the record to be written + * @throws StreamingException - thrown when write fails + */ + void write(long writeId, InputStream inputStream) throws StreamingException; + + /** * Flush records from buffer. Invoked by TransactionBatch.commitTransaction() + * + * @throws StreamingException - thrown when flush fails */ void flush() throws StreamingException; http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index 5396d76..fbe00db 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -18,6 +18,8 @@ package org.apache.hive.streaming; +import java.io.InputStream; + import org.apache.hadoop.hive.conf.HiveConf; public interface StreamingConnection extends ConnectionInfo, PartitionHandler { @@ -44,6 +46,14 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler { void write(byte[] record) throws StreamingException; /** + * Write record using RecordWriter. + * + * @param inputStream - input stream of records + * @throws StreamingException - if there are errors when writing + */ + void write(InputStream inputStream) throws StreamingException; + + /** * Commit a transaction to make the writes visible for readers. * * @throws StreamingException - if there are errors when committing the open transaction http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java index 56f59fd..13de1d4 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java @@ -19,7 +19,9 @@ package org.apache.hive.streaming; +import java.io.InputStream; import java.util.Properties; +import java.util.Scanner; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; @@ -44,6 +46,7 @@ public class StrictDelimitedInputWriter extends AbstractRecordWriter { private LazySimpleSerDe serde; private StrictDelimitedInputWriter(Builder builder) { + super(builder.lineDelimiter); this.fieldDelimiter = builder.fieldDelimiter; this.collectionDelimiter = builder.collectionDelimiter; this.mapKeyDelimiter = builder.mapKeyDelimiter; @@ -57,6 +60,7 @@ public class StrictDelimitedInputWriter extends AbstractRecordWriter { private char fieldDelimiter = (char) LazySerDeParameters.DefaultSeparators[0]; private char collectionDelimiter = (char) LazySerDeParameters.DefaultSeparators[1]; private char mapKeyDelimiter = (char) LazySerDeParameters.DefaultSeparators[2]; + private String lineDelimiter; public Builder withFieldDelimiter(final char fieldDelimiter) { this.fieldDelimiter = fieldDelimiter; @@ -73,6 +77,11 @@ public class StrictDelimitedInputWriter extends AbstractRecordWriter { return this; } + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictDelimitedInputWriter build() { return new StrictDelimitedInputWriter(this); } http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 2c54eef..0f9b652 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -34,13 +34,24 @@ import org.apache.hadoop.io.Text; public class StrictJsonWriter extends AbstractRecordWriter { private JsonSerDe serde; + public StrictJsonWriter(final Builder builder) { + super(builder.lineDelimiter); + } + public static Builder newBuilder() { return new Builder(); } public static class Builder { + private String lineDelimiter; + + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictJsonWriter build() { - return new StrictJsonWriter(); + return new StrictJsonWriter(this); } } http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index ba2609f..3651fa1 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -38,6 +38,7 @@ public class StrictRegexWriter extends AbstractRecordWriter { private RegexSerDe serde; private StrictRegexWriter(final Builder builder) { + super(builder.lineDelimiter); this.regex = builder.regex; } @@ -47,12 +48,18 @@ public class StrictRegexWriter extends AbstractRecordWriter { public static class Builder { private String regex; + private String lineDelimiter; public Builder withRegex(final String regex) { this.regex = regex; return this; } + public Builder withLineDelimiterPattern(final String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + public StrictRegexWriter build() { return new StrictRegexWriter(this); } http://git-wip-us.apache.org/repos/asf/hive/blob/074ec7ef/streaming/src/test/org/apache/hive/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index a6fdd66..042fdbe 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -20,11 +20,13 @@ package org.apache.hive.streaming; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; @@ -468,6 +470,60 @@ public class TestStreaming { } @Test + public void testAllTypesDelimitedWriterInputStream() throws Exception { + queryTable(driver, "drop table if exists default.alltypes"); + queryTable(driver, + "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, " + + "f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), " + + "m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " + + "stored as orc TBLPROPERTIES('transactional'='true')"); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter('|') + .withCollectionDelimiter(',') + .withMapKeyDelimiter(':') + .withLineDelimiterPattern("\n") + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("default") + .withTable("alltypes") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(2) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + + String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " + + "15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo"; + String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|" + + "k4:v4|200,300|20,bar"; + String allRows = row1 + "\n" + row2 + "\n"; + ByteArrayInputStream bais = new ByteArrayInputStream(allRows.getBytes()); + connection.beginTransaction(); + connection.write(bais); + connection.commitTransaction(); + connection.close(); + bais.close(); + + List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st," + + " INPUT__FILE__NAME from default.alltypes order by ROW__ID"); + Assert.assertEquals(2, rs.size()); + String gotRow1 = rs.get(0); + String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring" + + "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}"; + String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000"; + String gotRow2 = rs.get(1); + String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd" + + "\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}"; + String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000"; + Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1)); + Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1)); + Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2)); + Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2)); + } + + @Test public void testAutoRollTransactionBatch() throws Exception { queryTable(driver, "drop table if exists default.streamingnobuckets"); queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc " + @@ -1226,6 +1282,37 @@ public class TestStreaming { } @Test + public void testRegexInputStream() throws Exception { + String regex = "([^,]*),(.*)"; + StrictRegexWriter writer = StrictRegexWriter.newBuilder() + // if unspecified, default one or [\r\n] will be used for line break + .withRegex(regex) + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); + + String rows = "1,foo\r2,bar\r3,baz"; + ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes()); + connection.beginTransaction(); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + + List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName); + Assert.assertEquals(3, rs.size()); + Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tbar\tAsia\tIndia", rs.get(1)); + Assert.assertEquals("3\tbaz\tAsia\tIndia", rs.get(2)); + } + + @Test public void testTransactionBatchCommitJson() throws Exception { StrictJsonWriter writer = StrictJsonWriter.newBuilder() .build(); @@ -1261,6 +1348,37 @@ public class TestStreaming { } @Test + public void testJsonInputStream() throws Exception { + StrictJsonWriter writer = StrictJsonWriter.newBuilder() + .withLineDelimiterPattern("\\|") + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + // 1st Txn + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, " + + "\"msg\": \"Hello world!!\"}"; + ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes()); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName); + Assert.assertEquals(3, rs.size()); + Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tHello world\tAsia\tIndia", rs.get(1)); + Assert.assertEquals("3\tHello world!!\tAsia\tIndia", rs.get(2)); + } + + @Test public void testRemainingTransactions() throws Exception { StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() .withFieldDelimiter(',') @@ -2797,6 +2915,12 @@ public class TestStreaming { } @Override + public void write(final long writeId, final InputStream inputStream) throws StreamingException { + delegate.write(writeId, inputStream); + produceFault(); + } + + @Override public void flush() throws StreamingException { delegate.flush(); produceFault();