Repository: hive Updated Branches: refs/heads/branch-3.0 acfd2099a -> 1c6d946f9
HIVE-19817: Hive streaming API + dynamic partitioning + json/regex writer does not work (Prasanth Jayachandran reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c6d946f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c6d946f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c6d946f Branch: refs/heads/branch-3.0 Commit: 1c6d946f97e35a53194868f7a0b2f87ccb5be206 Parents: acfd209 Author: Prasanth Jayachandran <[email protected]> Authored: Fri Jun 8 10:57:07 2018 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Fri Jun 8 10:57:58 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/serde2/JsonSerDe.java | 2 +- .../hive/streaming/HiveStreamingConnection.java | 4 + .../apache/hive/streaming/StrictJsonWriter.java | 5 ++ .../hive/streaming/StrictRegexWriter.java | 19 ++++- .../TestStreamingDynamicPartitioning.java | 77 ++++++++++++++++++++ 5 files changed, 104 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1c6d946f/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java index 40b2e8e..1119fa2 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java @@ -178,7 +178,7 @@ public class JsonSerDe extends AbstractSerDe { if (token != JsonToken.FIELD_NAME) { throw new IOException("Field name expected"); } - String fieldName = p.getText(); + String fieldName = p.getText().toLowerCase(); int fpos = s.getAllStructFieldNames().indexOf(fieldName); if (fpos == -1) { fpos = getPositionFromHiveInternalColumnName(fieldName); http://git-wip-us.apache.org/repos/asf/hive/blob/1c6d946f/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 0f9260d..1d8fdff 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -899,6 +899,10 @@ public class HiveStreamingConnection implements StreamingConnection { } private void abortImpl(boolean abortAllRemaining) throws StreamingException { + if (minTxnId == null) { + return; + } + transactionLock.lock(); try { if (abortAllRemaining) { http://git-wip-us.apache.org/repos/asf/hive/blob/1c6d946f/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 0f9b652..cabb64c 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -20,11 +20,14 @@ package org.apache.hive.streaming; import java.util.Properties; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.JsonSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.io.Text; +import com.google.common.base.Joiner; + /** * Streaming Writer handles utf8 encoded Json (Strict syntax). * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input @@ -64,6 +67,8 @@ public class StrictJsonWriter extends AbstractRecordWriter { public JsonSerDe createSerde() throws SerializationError { try { Properties tableProps = table.getMetadata(); + tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns)); + tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes)); JsonSerDe serde = new JsonSerDe(); SerDeUtils.initializeSerDe(serde, conf, tableProps, null); this.serde = serde; http://git-wip-us.apache.org/repos/asf/hive/blob/1c6d946f/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index 3651fa1..12516f5 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -18,15 +18,20 @@ package org.apache.hive.streaming; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.RegexSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.io.Text; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + /** * Streaming Writer handles text input data with regex. Uses * org.apache.hadoop.hive.serde2.RegexSerDe @@ -75,7 +80,17 @@ public class StrictRegexWriter extends AbstractRecordWriter { try { Properties tableProps = table.getMetadata(); tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex); - tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(inputColumns, ",")); + tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns)); + tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes)); + final String columnComments = tableProps.getProperty("columns.comments"); + if (columnComments != null) { + List<String> comments = Lists.newArrayList(Splitter.on('\0').split(columnComments)); + int commentsSize = comments.size(); + for (int i = 0; i < inputColumns.size() - commentsSize; i++) { + comments.add(""); + } + tableProps.setProperty("columns.comments", Joiner.on('\0').join(comments)); + } RegexSerDe serde = new RegexSerDe(); SerDeUtils.initializeSerDe(serde, conf, tableProps, null); this.serde = serde; http://git-wip-us.apache.org/repos/asf/hive/blob/1c6d946f/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java index e513915..32a6d06 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreamingDynamicPartitioning.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -579,6 +580,82 @@ public class TestStreamingDynamicPartitioning { } @Test + public void testRegexInputStreamDP() throws Exception { + String regex = "([^,]*),(.*),(.*),(.*)"; + StrictRegexWriter writer = StrictRegexWriter.newBuilder() + // if unspecified, default one or [\r\n] will be used for line break + .withRegex(regex) + .build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); + + String rows = "1,foo,Asia,India\r2,bar,Europe,Germany\r3,baz,Asia,China\r4,cat,Australia,"; + ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes()); + connection.beginTransaction(); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + + List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id"); + Assert.assertEquals(4, rs.size()); + Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tbar\tEurope\tGermany", rs.get(1)); + Assert.assertEquals("3\tbaz\tAsia\tChina", rs.get(2)); + Assert.assertEquals("4\tcat\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3)); + rs = queryTable(driver, "show partitions " + dbName + "." + tblName); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.contains("continent=Asia/country=India")); + Assert.assertTrue(rs.contains("continent=Asia/country=China")); + Assert.assertTrue(rs.contains("continent=Europe/country=Germany")); + Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__")); + } + + @Test + public void testJsonInputStreamDP() throws Exception { + StrictJsonWriter writer = StrictJsonWriter.newBuilder() + .withLineDelimiterPattern("\\|") + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + // 1st Txn + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState()); + String records = "{\"id\" : 1, \"msg\": \"Hello streaming\", \"continent\": \"Asia\", \"Country\": \"India\"}|" + + "{\"id\" : 2, \"msg\": \"Hello world\", \"continent\": \"Europe\", \"Country\": \"Germany\"}|" + + "{\"id\" : 3, \"msg\": \"Hello world!!\", \"continent\": \"Asia\", \"Country\": \"China\"}|" + + "{\"id\" : 4, \"msg\": \"Hmm..\", \"continent\": \"Australia\", \"Unknown-field\": \"whatever\"}|"; + ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes()); + connection.write(bais); + connection.commitTransaction(); + bais.close(); + connection.close(); + List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName + " order by id"); + Assert.assertEquals(4, rs.size()); + Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0)); + Assert.assertEquals("2\tHello world\tEurope\tGermany", rs.get(1)); + Assert.assertEquals("3\tHello world!!\tAsia\tChina", rs.get(2)); + Assert.assertEquals("4\tHmm..\tAustralia\t__HIVE_DEFAULT_PARTITION__", rs.get(3)); + rs = queryTable(driver, "show partitions " + dbName + "." + tblName); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.contains("continent=Asia/country=India")); + Assert.assertTrue(rs.contains("continent=Asia/country=China")); + Assert.assertTrue(rs.contains("continent=Europe/country=Germany")); + Assert.assertTrue(rs.contains("continent=Australia/country=__HIVE_DEFAULT_PARTITION__")); + } + + @Test public void testWriteAfterClose() throws Exception { StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() .withFieldDelimiter(',')
