http://git-wip-us.apache.org/repos/asf/hive/blob/61c6a702/streaming/src/test/org/apache/hive/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 0ec3048..a6fdd66 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,12 +54,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.Validator; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; @@ -90,13 +89,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; -import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.orc.impl.OrcAcidUtils; import org.apache.orc.tools.FileDump; import org.apache.thrift.TException; @@ -115,6 +112,7 @@ public class TestStreaming { public static class RawFileSystem extends RawLocalFileSystem { private static final URI NAME; + static { try { NAME = new URI("raw:///"); @@ -128,12 +126,16 @@ public class TestStreaming { return NAME; } + @Override + public String getScheme() { + return "raw"; + } @Override public FileStatus getFileStatus(Path path) throws IOException { File file = pathToFile(path); if (!file.exists()) { - throw new FileNotFoundException("Can't find " + path); + throw new FileNotFoundException("Can'table find " + path); } // get close enough short mod = 0; @@ -147,32 +149,30 @@ public class TestStreaming { mod |= 0111; } return new FileStatus(file.length(), file.isDirectory(), 1, 1024, - file.lastModified(), file.lastModified(), - FsPermission.createImmutable(mod), "owen", "users", path); + file.lastModified(), file.lastModified(), + FsPermission.createImmutable(mod), "owen", "users", path); } } private static final String COL1 = "id"; private static final String COL2 = "msg"; - private final HiveConf conf; + private static HiveConf conf = null; private IDriver driver; private final IMetaStoreClient msClient; - final String metaStoreURI = null; - // partitioned table private final static String dbName = "testing"; private final static String tblName = "alerts"; - private final static String[] fieldNames = new String[]{COL1,COL2}; - List<String> partitionVals; + private final static String[] fieldNames = new String[]{COL1, COL2}; + static List<String> partitionVals; private static Path partLoc; private static Path partLoc2; // unpartitioned table private final static String dbName2 = "testing2"; private final static String tblName2 = "alerts"; - private final static String[] fieldNames2 = new String[]{COL1,COL2}; + private final static String[] fieldNames2 = new String[]{COL1, COL2}; // for bucket join testing @@ -201,13 +201,9 @@ public class TestStreaming { conf = new HiveConf(this.getClass()); conf.set("fs.raw.impl", RawFileSystem.class.getName()); - conf - .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); TxnDbUtil.setConfValues(conf); - if (metaStoreURI!=null) { - conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI); - } conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); dbFolder.create(); @@ -229,12 +225,13 @@ public class TestStreaming { // drop and recreate the necessary databases and tables dropDB(msClient, dbName); - String[] colNames = new String[] {COL1, COL2}; - String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME}; - String[] bucketCols = new String[] {COL1}; + String[] colNames = new String[]{COL1, COL2}; + String[] colTypes = new String[]{serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME}; + String[] bucketCols = new String[]{COL1}; String loc1 = dbFolder.newFolder(dbName + ".db").toString(); String[] partNames = new String[]{"Continent", "Country"}; - partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1); + partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, + 1); dropDB(msClient, dbName2); String loc2 = dbFolder.newFolder(dbName2 + ".db").toString(); @@ -249,19 +246,11 @@ public class TestStreaming { } @After - public void cleanup() throws Exception { + public void cleanup() { msClient.close(); driver.close(); } - private static List<FieldSchema> getPartitionKeys() { - List<FieldSchema> fields = new ArrayList<FieldSchema>(); - // Defining partition names in unsorted order - fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, "")); - fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, "")); - return fields; - } - private void createStoreSales(String dbName, String loc) throws Exception { String dbUri = "raw://" + new Path(loc).toUri().toString(); String tableLoc = dbUri + Path.SEPARATOR + "store_sales"; @@ -301,43 +290,48 @@ public class TestStreaming { ")\n" + " partitioned by (dt string)\n" + "clustered by (ss_store_sk, ss_promo_sk)\n" + - "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" + " TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')"); + "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" + + " TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')"); Assert.assertTrue(success); success = runDDL(driver, "alter table store_sales add partition(dt='2015')"); Assert.assertTrue(success); } + /** * make sure it works with table where bucket col is not 1st col + * * @throws Exception */ @Test public void testBucketingWhereBucketColIsNotFirstCol() throws Exception { List<String> partitionVals = new ArrayList<String>(); partitionVals.add("2015"); - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", - "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", - "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", - "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); - txnBatch.beginNextTransaction(); - + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("testing5") + .withTable("store_sales") + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); StringBuilder row = new StringBuilder(); - for(int i = 0; i < 10; i++) { - for(int ints = 0; ints < 11; ints++) { + for (int i = 0; i < 10; i++) { + for (int ints = 0; ints < 11; ints++) { row.append(ints).append(','); } - for(int decs = 0; decs < 12; decs++) { + for (int decs = 0; decs < 12; decs++) { row.append(i + 0.1).append(','); } row.setLength(row.length() - 1); - txnBatch.write(row.toString().getBytes()); + connection.write(row.toString().getBytes()); } - txnBatch.commit(); - txnBatch.close(); + connection.commitTransaction(); connection.close(); ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales"); @@ -352,35 +346,41 @@ public class TestStreaming { @Test public void testNoBuckets() throws Exception { queryTable(driver, "drop table if exists default.streamingnobuckets"); - //todo: why does it need transactional_properties? - queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')"); + queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')"); List<String> rs = queryTable(driver, "select * from default.streamingnobuckets"); Assert.assertEquals(1, rs.size()); Assert.assertEquals("foo\tbar", rs.get(0)); - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", "streamingNoBuckets", null); - String[] colNames1 = new String[] { "a", "b" }; - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr); - txnBatch.beginNextTransaction(); - txnBatch.write("a1,b2".getBytes()); - txnBatch.write("a3,b4".getBytes()); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("streamingNoBuckets") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(2) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + connection.write("a1,b2".getBytes()); + connection.write("a3,b4".getBytes()); TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest()); Assert.assertEquals(resp.getLocksSize(), 1); Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename()); Assert.assertEquals("default", resp.getLocks().get(0).getDbname()); - txnBatch.commit(); - txnBatch.beginNextTransaction(); - txnBatch.write("a5,b6".getBytes()); - txnBatch.write("a7,b8".getBytes()); - txnBatch.commit(); - txnBatch.close(); + connection.commitTransaction(); + connection.beginTransaction(); + connection.write("a5,b6".getBytes()); + connection.write("a7,b8".getBytes()); + connection.commitTransaction(); + connection.close(); Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); - rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); + rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000")); @@ -404,7 +404,7 @@ public class TestStreaming { queryTable(driver, "alter table default.streamingnobuckets compact 'major'"); runWorker(conf); - rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); + rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000")); @@ -416,6 +416,152 @@ public class TestStreaming { Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); } + @Test + public void testAllTypesDelimitedWriter() throws Exception { + queryTable(driver, "drop table if exists default.alltypes"); + queryTable(driver, + "create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, " + + "f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), " + + "m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " + + "stored as orc TBLPROPERTIES('transactional'='true')"); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter('|') + .withCollectionDelimiter(',') + .withMapKeyDelimiter(':') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("default") + .withTable("alltypes") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(2) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + + String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " + + "15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo"; + String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|" + + "k4:v4|200,300|20,bar"; + connection.beginTransaction(); + connection.write(row1.getBytes()); + connection.write(row2.getBytes()); + connection.commitTransaction(); + connection.close(); + + List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st," + + " INPUT__FILE__NAME from default.alltypes order by ROW__ID"); + Assert.assertEquals(2, rs.size()); + String gotRow1 = rs.get(0); + String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring" + + "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}"; + String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000"; + String gotRow2 = rs.get(1); + String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," + + "\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd" + + "\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}"; + String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000"; + Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1)); + Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1)); + Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2)); + Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2)); + } + + @Test + public void testAutoRollTransactionBatch() throws Exception { + queryTable(driver, "drop table if exists default.streamingnobuckets"); + queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); + queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')"); + List<String> rs = queryTable(driver, "select * from default.streamingnobuckets"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("foo\tbar", rs.get(0)); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("default") + .withTable("streamingnobuckets") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(wr) + .withHiveConf(conf) + .withTransactionBatchSize(2) + .connect(); + + connection.beginTransaction(); + connection.write("a1,b2".getBytes()); + connection.write("a3,b4".getBytes()); + connection.commitTransaction(); + connection.beginTransaction(); + connection.write("a5,b6".getBytes()); + connection.write("a7,b8".getBytes()); + connection.commitTransaction(); + // should have rolled over to next transaction batch + connection.beginTransaction(); + connection.write("a9,b10".getBytes()); + connection.write("a11,b12".getBytes()); + connection.commitTransaction(); + connection.beginTransaction(); + connection.write("a13,b14".getBytes()); + connection.write("a15,b16".getBytes()); + connection.commitTransaction(); + connection.close(); + + Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); + + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8")); + Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); + + Assert.assertTrue(rs.get(5), rs.get(5).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\ta9\tb10")); + Assert.assertTrue(rs.get(5), rs.get(5).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000")); + Assert.assertTrue(rs.get(6), rs.get(6).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12")); + Assert.assertTrue(rs.get(6), rs.get(6).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000")); + Assert.assertTrue(rs.get(7), rs.get(7).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14")); + Assert.assertTrue(rs.get(7), rs.get(7).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000")); + Assert.assertTrue(rs.get(8), rs.get(8).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\ta15\tb16")); + Assert.assertTrue(rs.get(8), rs.get(8).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000")); + + queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'"); + queryTable(driver, "delete from default.streamingnobuckets where a='a1'"); + queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a15'"); + queryTable(driver, "delete from default.streamingnobuckets where a='a9'"); + rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b"); + int row = 0; + Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++)); + Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++)); + Assert.assertEquals("at row=" + row, "a11\tb12", rs.get(row++)); + Assert.assertEquals("at row=" + row, "a13\tb14", rs.get(row++)); + Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++)); + Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++)); + Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++)); + + queryTable(driver, "alter table default.streamingnobuckets compact 'major'"); + runWorker(conf); + rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); + + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14")); + Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + Assert.assertTrue(rs.get(5), rs.get(5).startsWith("{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); + Assert.assertTrue(rs.get(5), rs.get(5).endsWith("streamingnobuckets/base_0000009/bucket_00000")); + } + /** * this is a clone from TestTxnStatement2.... */ @@ -436,71 +582,83 @@ public class TestStreaming { int bucketCount = 100; String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString(); - String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'"; + String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'"; String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'"; String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'"; - runDDL(driver, "create database testBucketing3"); - runDDL(driver, "use testBucketing3"); - runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " - + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='true')") ; -// In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables - runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3) ; - runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " - + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')"); - - - String[] records = new String[] { - "PSFAHYLZVC,29,EPNMA", - "PPPRKWAYAU,96,VUTEE", - "MIAOFERCHI,3,WBDSI", - "CEGQAZOWVN,0,WCUZL", - "XWAKMNSVQF,28,YJVHU", - "XBWTSAJWME,2,KDQFO", - "FUVLQTAXAY,5,LDSDG", - "QTQMDJMGJH,6,QBOMA", - "EFLOTLWJWN,71,GHWPS", - "PEQNAOJHCM,82,CAAFI", - "MOEKQLGZCP,41,RUACR", - "QZXMCOPTID,37,LFLWE", - "EYALVWICRD,13,JEZLC", - "VYWLZAYTXX,16,DMVZX", - "OSALYSQIXR,47,HNZVE", - "JGKVHKCEGQ,25,KSCJB", - "WQFMMYDHET,12,DTRWA", - "AJOVAYZKZQ,15,YBKFO", - "YAQONWCUAU,31,QJNHZ", - "DJBXUEUOEB,35,IYCBL" - }; - - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null); - String[] colNames1 = new String[] { "key1", "key2", "data" }; - DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr); - txnBatch.beginNextTransaction(); - - for (String record : records) { - txnBatch.write(record.toString().getBytes()); - } + // disabling vectorization as this test yields incorrect results with vectorization + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + try (IDriver driver = DriverFactory.newDriver(conf)) { + runDDL(driver, "create database testBucketing3"); + runDDL(driver, "use testBucketing3"); + runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='true')"); + // In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables + runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3); + runDDL(driver, + "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')"); + + + String[] records = new String[]{ + "PSFAHYLZVC,29,EPNMA", + "PPPRKWAYAU,96,VUTEE", + "MIAOFERCHI,3,WBDSI", + "CEGQAZOWVN,0,WCUZL", + "XWAKMNSVQF,28,YJVHU", + "XBWTSAJWME,2,KDQFO", + "FUVLQTAXAY,5,LDSDG", + "QTQMDJMGJH,6,QBOMA", + "EFLOTLWJWN,71,GHWPS", + "PEQNAOJHCM,82,CAAFI", + "MOEKQLGZCP,41,RUACR", + "QZXMCOPTID,37,LFLWE", + "EYALVWICRD,13,JEZLC", + "VYWLZAYTXX,16,DMVZX", + "OSALYSQIXR,47,HNZVE", + "JGKVHKCEGQ,25,KSCJB", + "WQFMMYDHET,12,DTRWA", + "AJOVAYZKZQ,15,YBKFO", + "YAQONWCUAU,31,QJNHZ", + "DJBXUEUOEB,35,IYCBL" + }; + + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("testBucketing3") + .withTable("streamedtable") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + + for (String record : records) { + connection.write(record.getBytes()); + } - txnBatch.commit(); - txnBatch.close(); - connection.close(); + connection.commitTransaction(); + connection.close(); - ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2"); - for (String re : res1) { - System.out.println(re); - } + ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2"); + for (String re : res1) { + LOG.error(re); + } - driver.run("insert into nobucket select row__id.bucketid,* from streamedtable"); - runDDL(driver, " insert into finaltable select * from nobucket"); - ArrayList<String> res2 = queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid"); - for (String s : res2) { - LOG.error(s); + driver.run("insert into nobucket select row__id.bucketid,* from streamedtable"); + runDDL(driver, "insert into finaltable select * from nobucket"); + ArrayList<String> res2 = queryTable(driver, + "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid"); + for (String s : res2) { + LOG.error(s); + } + Assert.assertTrue(res2.isEmpty()); + } finally { + conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname); } - Assert.assertTrue(res2.isEmpty()); } @@ -512,32 +670,53 @@ public class TestStreaming { String tbl1 = "validation1"; String tbl2 = "validation2"; - String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'"; + String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'"; String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'"; runDDL(driver, "create database testBucketing3"); runDDL(driver, "use testBucketing3"); runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " - + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='false')") ; + + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='false')"); runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into " - + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ; - + + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')"); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = null; try { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null); - endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + connection = HiveStreamingConnection.newBuilder() + .withDatabase("testBucketing3") + .withTable("validation2") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception + } finally { + if (connection != null) { + connection.close(); + } } try { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null); - endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + connection = HiveStreamingConnection.newBuilder() + .withDatabase("testBucketing3") + .withTable("validation2") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); Assert.assertTrue("InvalidTable exception was not thrown", false); } catch (InvalidTable e) { // expecting this exception + } finally { + if (connection != null) { + connection.close(); + } } } @@ -547,7 +726,7 @@ public class TestStreaming { */ @Deprecated private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, - String... records) throws Exception { + String... records) throws Exception { ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); Assert.assertEquals(0, dir.getObsolete().size()); @@ -585,7 +764,7 @@ public class TestStreaming { InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(numExpectedFiles, splits.length); org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr = - inf.getRecordReader(splits[0], job, Reporter.NULL); + inf.getRecordReader(splits[0], job, Reporter.NULL); NullWritable key = rr.createKey(); OrcStruct value = rr.createValue(); @@ -595,12 +774,13 @@ public class TestStreaming { } Assert.assertEquals(false, rr.next(key, value)); } + /** * @param validationQuery query to read from table to compare data against {@code records} - * @param records expected data. each row is CVS list of values + * @param records expected data. each row is CVS list of values */ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, - String validationQuery, boolean vectorize, String... records) throws Exception { + String validationQuery, boolean vectorize, String... records) throws Exception { ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); Assert.assertEquals(0, dir.getObsolete().size()); @@ -626,12 +806,13 @@ public class TestStreaming { Assert.assertEquals(minTxn, min); Assert.assertEquals(maxTxn, max); boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); - if(vectorize) { + if (vectorize) { conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); } String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY); - for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) { + for (String strategy : ((Validator.StringSet) HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()) + .getExpected()) { //run it with each split strategy - make sure there are differences conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase()); List<String> actualResult = queryTable(driver, validationQuery); @@ -656,35 +837,44 @@ public class TestStreaming { @Test public void testEndpointConnection() throws Exception { // For partitioned table, partitionVals are specified - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); //shouldn't throw + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); connection.close(); // For unpartitioned table, partitionVals are not specified - endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - endPt.newConnection(false, "UT_" + Thread.currentThread().getName()).close(); // should not throw - - // For partitioned table, partitionVals are not specified - try { - endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null); - connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); - Assert.assertTrue("ConnectionError was not thrown", false); - connection.close(); - } catch (ConnectionError e) { - // expecting this exception - String errMsg = "doesn't specify any partitions for partitioned table"; - Assert.assertTrue(e.toString().endsWith(errMsg)); - } + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + connection.close(); // For unpartitioned table, partition values are specified try { - endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals); - connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); Assert.assertTrue("ConnectionError was not thrown", false); connection.close(); } catch (ConnectionError e) { // expecting this exception - String errMsg = "specifies partitions for unpartitioned table"; + String errMsg = "specifies partitions for un-partitioned table"; Assert.assertTrue(e.toString().endsWith(errMsg)); } } @@ -695,417 +885,474 @@ public class TestStreaming { newPartVals.add(PART1_CONTINENT); newPartVals.add("Nepal"); - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName - , newPartVals); - - // Ensure partition is absent - try { - msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); - Assert.assertTrue("Partition already exists", false); - } catch (NoSuchObjectException e) { - // expect this exception - } + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(newPartVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); // Create partition - Assert.assertNotNull(endPt.newConnection(true, "UT_" + Thread.currentThread().getName())); + Assert.assertNotNull(connection); // Ensure partition is present - Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals); + Partition p = msClient.getPartition(dbName, tblName, partitionVals); Assert.assertNotNull("Did not find added partition", p); } @Test public void testTransactionBatchEmptyCommit() throws Exception { // 1) to partitioned table - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - - txnBatch.beginNextTransaction(); - txnBatch.commit(); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); - txnBatch.close(); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + connection.beginTransaction(); + connection.commitTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); connection.close(); // 2) To unpartitioned table - endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - writer = new DelimitedInputWriter(fieldNames2,",", endPt); - connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - - txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.commit(); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); - txnBatch.close(); + writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + connection.commitTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); connection.close(); } /** * check that transactions that have not heartbeated and timedout get properly aborted + * * @throws Exception */ @Test public void testTimeOutReaper() throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); - txnBatch.beginNextTransaction(); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS); //ensure txn timesout - conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS); AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService(); houseKeeperService.setConf(conf); houseKeeperService.run(); try { //should fail because the TransactionBatch timed out - txnBatch.commit(); - } - catch(TransactionError e) { + connection.commitTransaction(); + } catch (TransactionError e) { Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException); } - txnBatch.close(); - txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.commit(); - txnBatch.beginNextTransaction(); + connection.close(); + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + connection.beginTransaction(); + connection.commitTransaction(); + connection.beginTransaction(); houseKeeperService.run(); try { //should fail because the TransactionBatch timed out - txnBatch.commit(); - } - catch(TransactionError e) { + connection.commitTransaction(); + } catch (TransactionError e) { Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException); } - txnBatch.close(); connection.close(); } @Test public void testHeartbeat() throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt, connection); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); - txnBatch.beginNextTransaction(); - //todo: this should ideally check Transaction heartbeat as well, but heartbeat - //timestamp is not reported yet - //GetOpenTxnsInfoResponse txnresp = msClient.showTxns(); - ShowLocksRequest request = new ShowLocksRequest(); - request.setDbname(dbName2); - request.setTablename(tblName2); - ShowLocksResponse response = msClient.showLocks(request); - Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size()); - ShowLocksResponseElement lock = response.getLocks().get(0); - long acquiredAt = lock.getAcquiredat(); - long heartbeatAt = lock.getLastheartbeat(); - txnBatch.heartbeat(); - response = msClient.showLocks(request); - Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size()); - lock = response.getLocks().get(0); - Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat()); - Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() + - ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt); - txnBatch.close(); - int txnBatchSize = 200; - txnBatch = connection.fetchTransactionBatch(txnBatchSize, writer); - for(int i = 0; i < txnBatchSize; i++) { - txnBatch.beginNextTransaction(); - if(i % 47 == 0) { - txnBatch.heartbeat(); - } - if(i % 10 == 0) { - txnBatch.abort(); - } - else { - txnBatch.commit(); - } - if(i % 37 == 0) { - txnBatch.heartbeat(); + int transactionBatch = 20; + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 200, TimeUnit.MILLISECONDS); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(transactionBatch) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + try { + connection.beginTransaction(); + ShowLocksRequest request = new ShowLocksRequest(); + request.setDbname(dbName2); + request.setTablename(tblName2); + ShowLocksResponse response = msClient.showLocks(request); + Assert.assertEquals("Wrong number of locks: " + response, 1, response.getLocks().size()); + ShowLocksResponseElement lock = response.getLocks().get(0); + long acquiredAt = lock.getAcquiredat(); + long heartbeatAt = lock.getLastheartbeat(); + response = msClient.showLocks(request); + Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size()); + lock = response.getLocks().get(0); + Assert.assertEquals("Acquired timestamp didn'table match", acquiredAt, lock.getAcquiredat()); + Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() + + ") == old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat() == heartbeatAt); + for (int i = 0; i < transactionBatch * 3; i++) { + connection.beginTransaction(); + if (i % 10 == 0) { + connection.abortTransaction(); + } else { + connection.commitTransaction(); + } + Thread.sleep(10); } + } finally { + conf.unset(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname); + connection.close(); } - } + @Test public void testTransactionBatchEmptyAbort() throws Exception { // 1) to partitioned table - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.abort(); - Assert.assertEquals(TransactionBatch.TxnState.ABORTED - , txnBatch.getCurrentTransactionState()); - txnBatch.close(); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + connection.abortTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED + , connection.getCurrentTransactionState()); connection.close(); // 2) to unpartitioned table - endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); - - txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.abort(); - Assert.assertEquals(TransactionBatch.TxnState.ABORTED - , txnBatch.getCurrentTransactionState()); - txnBatch.close(); + writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + connection.abortTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED + , connection.getCurrentTransactionState()); connection.close(); } @Test - public void testTransactionBatchCommit_Delimited() throws Exception { - testTransactionBatchCommit_Delimited(null); - } - @Test - public void testTransactionBatchCommit_DelimitedUGI() throws Exception { - testTransactionBatchCommit_Delimited(Utils.getUGI()); - } - private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection); + public void testTransactionBatchCommitDelimited() throws Exception { + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .withTransactionBatchSize(10) + .connect(); // 1st Txn - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write("1,Hello streaming".getBytes()); - txnBatch.commit(); + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write("1,Hello streaming".getBytes()); + connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); // 2nd Txn - txnBatch.beginNextTransaction(); - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write("2,Welcome to streaming".getBytes()); + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write("2,Welcome to streaming".getBytes()); // data should not be visible checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - txnBatch.commit(); + connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); - txnBatch.close(); - Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); - - connection.close(); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE + , connection.getCurrentTransactionState()); - // To Unpartitioned table - endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); - writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection); + // To Unpartitioned table + writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); // 1st Txn - txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write("1,Hello streaming".getBytes()); - txnBatch.commit(); - - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write("1,Hello streaming".getBytes()); + connection.commitTransaction(); + + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); connection.close(); } @Test - public void testTransactionBatchCommit_Regex() throws Exception { - testTransactionBatchCommit_Regex(null); - } - @Test - public void testTransactionBatchCommit_RegexUGI() throws Exception { - testTransactionBatchCommit_Regex(Utils.getUGI()); - } - private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + public void testTransactionBatchCommitRegex() throws Exception { String regex = "([^,]*),(.*)"; - StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection); + StrictRegexWriter writer = StrictRegexWriter.newBuilder() + .withRegex(regex) + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .withTransactionBatchSize(10) + .connect(); // 1st Txn - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write("1,Hello streaming".getBytes()); - txnBatch.commit(); + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write("1,Hello streaming".getBytes()); + connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); // 2nd Txn - txnBatch.beginNextTransaction(); - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write("2,Welcome to streaming".getBytes()); + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write("2,Welcome to streaming".getBytes()); // data should not be visible checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - txnBatch.commit(); + connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); - txnBatch.close(); - Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); - - connection.close(); - + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE + , connection.getCurrentTransactionState()); // To Unpartitioned table - endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); regex = "([^:]*):(.*)"; - writer = new StrictRegexWriter(regex, endPt, conf, connection); + writer = StrictRegexWriter.newBuilder() + .withRegex(regex) + .build(); + + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName2) + .withTable(tblName2) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); // 1st Txn - txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write("1:Hello streaming".getBytes()); - txnBatch.commit(); - - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write("1:Hello streaming".getBytes()); + connection.commitTransaction(); + + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); connection.close(); } @Test - public void testTransactionBatchCommit_Json() throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); - StrictJsonWriter writer = new StrictJsonWriter(endPt, connection); + public void testTransactionBatchCommitJson() throws Exception { + StrictJsonWriter writer = StrictJsonWriter.newBuilder() + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .withTransactionBatchSize(10) + .connect(); // 1st Txn - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + connection.beginTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}"; - txnBatch.write(rec1.getBytes()); - txnBatch.commit(); + connection.write(rec1.getBytes()); + connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); - - txnBatch.close(); - Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); connection.close(); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE + , connection.getCurrentTransactionState()); + List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName); Assert.assertEquals(1, rs.size()); } @Test public void testRemainingTransactions() throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); - + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + connection.beginTransaction(); // 1) test with txn.Commit() - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - int batch=0; - int initialCount = txnBatch.remainingTransactions(); - while (txnBatch.remainingTransactions()>0) { - txnBatch.beginNextTransaction(); - Assert.assertEquals(--initialCount, txnBatch.remainingTransactions()); - for (int rec=0; rec<2; ++rec) { - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write((batch * rec + ",Hello streaming").getBytes()); + int batch = 0; + int initialCount = connection.remainingTransactions(); + while (connection.remainingTransactions() > 0) { + connection.beginTransaction(); + Assert.assertEquals(--initialCount, connection.remainingTransactions()); + for (int rec = 0; rec < 2; ++rec) { + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write((batch * rec + ",Hello streaming").getBytes()); } - txnBatch.commit(); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + connection.commitTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); ++batch; } - Assert.assertEquals(0, txnBatch.remainingTransactions()); - txnBatch.close(); - - Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); + Assert.assertEquals(0, connection.remainingTransactions()); + connection.close(); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE + , connection.getCurrentTransactionState()); + + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); // 2) test with txn.Abort() - txnBatch = connection.fetchTransactionBatch(10, writer); - batch=0; - initialCount = txnBatch.remainingTransactions(); - while (txnBatch.remainingTransactions()>0) { - txnBatch.beginNextTransaction(); - Assert.assertEquals(--initialCount,txnBatch.remainingTransactions()); - for (int rec=0; rec<2; ++rec) { - Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); - txnBatch.write((batch * rec + ",Hello streaming").getBytes()); + connection.beginTransaction(); + batch = 0; + initialCount = connection.remainingTransactions(); + while (connection.remainingTransactions() > 0) { + connection.beginTransaction(); + Assert.assertEquals(--initialCount, connection.remainingTransactions()); + for (int rec = 0; rec < 2; ++rec) { + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN + , connection.getCurrentTransactionState()); + connection.write((batch * rec + ",Hello streaming").getBytes()); } - txnBatch.abort(); - Assert.assertEquals(TransactionBatch.TxnState.ABORTED - , txnBatch.getCurrentTransactionState()); + connection.abortTransaction(); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED + , connection.getCurrentTransactionState()); ++batch; } - Assert.assertEquals(0, txnBatch.remainingTransactions()); - txnBatch.close(); - - Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); - + Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); + + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE + , connection.getCurrentTransactionState()); } @Test public void testTransactionBatchAbort() throws Exception { - - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); - - - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.write("1,Hello streaming".getBytes()); - txnBatch.write("2,Welcome to streaming".getBytes()); - txnBatch.abort(); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + connection.write("1,Hello streaming".getBytes()); + connection.write("2,Welcome to streaming".getBytes()); + connection.abortTransaction(); checkNothingWritten(partLoc); - Assert.assertEquals(TransactionBatch.TxnState.ABORTED - , txnBatch.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED + , connection.getCurrentTransactionState()); - txnBatch.close(); connection.close(); checkNothingWritten(partLoc); @@ -1116,123 +1363,162 @@ public class TestStreaming { @Test public void testTransactionBatchAbortAndCommit() throws Exception { String agentInfo = "UT_" + Thread.currentThread().getName(); - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - StreamingConnection connection = endPt.newConnection(false, agentInfo); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.write("1,Hello streaming".getBytes()); - txnBatch.write("2,Welcome to streaming".getBytes()); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo(agentInfo) + .withRecordWriter(writer) + .withHiveConf(conf) + .withTransactionBatchSize(10) + .connect(); + + connection.beginTransaction(); + connection.write("1,Hello streaming".getBytes()); + connection.write("2,Welcome to streaming".getBytes()); ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest()); Assert.assertEquals("LockCount", 1, resp.getLocksSize()); Assert.assertEquals("LockType", LockType.SHARED_READ, resp.getLocks().get(0).getType()); Assert.assertEquals("LockState", LockState.ACQUIRED, resp.getLocks().get(0).getState()); Assert.assertEquals("AgentInfo", agentInfo, resp.getLocks().get(0).getAgentInfo()); - txnBatch.abort(); + connection.abortTransaction(); checkNothingWritten(partLoc); - Assert.assertEquals(TransactionBatch.TxnState.ABORTED - , txnBatch.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED + , connection.getCurrentTransactionState()); - txnBatch.beginNextTransaction(); - txnBatch.write("1,Hello streaming".getBytes()); - txnBatch.write("2,Welcome to streaming".getBytes()); - txnBatch.commit(); + connection.beginTransaction(); + connection.write("1,Hello streaming".getBytes()); + connection.write("2,Welcome to streaming".getBytes()); + connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", - "{2, Welcome to streaming}"); + "{2, Welcome to streaming}"); - txnBatch.close(); connection.close(); } @Test public void testMultipleTransactionBatchCommits() throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); - - TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.write("1,Hello streaming".getBytes()); - txnBatch.commit(); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withTransactionBatchSize(10) + .withHiveConf(conf) + .connect(); + + connection.beginTransaction(); + connection.write("1,Hello streaming".getBytes()); + connection.commitTransaction(); String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming"); - txnBatch.beginNextTransaction(); - txnBatch.write("2,Welcome to streaming".getBytes()); - txnBatch.commit(); + connection.beginTransaction(); + connection.write("2,Welcome to streaming".getBytes()); + connection.commitTransaction(); - checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello streaming", - "2\tWelcome to streaming"); + checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello streaming", + "2\tWelcome to streaming"); - txnBatch.close(); + connection.close(); + connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withTransactionBatchSize(10) + .withHiveConf(conf) + .connect(); // 2nd Txn Batch - txnBatch = connection.fetchTransactionBatch(10, writer); - txnBatch.beginNextTransaction(); - txnBatch.write("3,Hello streaming - once again".getBytes()); - txnBatch.commit(); + connection.beginTransaction(); + connection.write("3,Hello streaming - once again".getBytes()); + connection.commitTransaction(); - checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming", - "2\tWelcome to streaming", "3\tHello streaming - once again"); + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming", + "2\tWelcome to streaming", "3\tHello streaming - once again"); - txnBatch.beginNextTransaction(); - txnBatch.write("4,Welcome to streaming - once again".getBytes()); - txnBatch.commit(); + connection.beginTransaction(); + connection.write("4,Welcome to streaming - once again".getBytes()); + connection.commitTransaction(); - checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming", - "2\tWelcome to streaming", "3\tHello streaming - once again", - "4\tWelcome to streaming - once again"); + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming", + "2\tWelcome to streaming", "3\tHello streaming - once again", + "4\tWelcome to streaming - once again"); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); - - txnBatch.close(); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); connection.close(); } @Test public void testInterleavedTransactionBatchCommits() throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer) + .withHiveConf(conf) + .withTransactionBatchSize(10) + .connect(); // Acquire 1st Txn Batch - TransactionBatch txnBatch1 = connection.fetchTransactionBatch(10, writer); - txnBatch1.beginNextTransaction(); + connection.beginTransaction(); // Acquire 2nd Txn Batch - DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt); - TransactionBatch txnBatch2 = connection.fetchTransactionBatch(10, writer2); - txnBatch2.beginNextTransaction(); + StrictDelimitedInputWriter writer2 = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withRecordWriter(writer2) + .withHiveConf(conf) + .withTransactionBatchSize(10) + .connect(); + connection2.beginTransaction(); // Interleaved writes to both batches - txnBatch1.write("1,Hello streaming".getBytes()); - txnBatch2.write("3,Hello streaming - once again".getBytes()); + connection.write("1,Hello streaming".getBytes()); + connection2.write("3,Hello streaming - once again".getBytes()); checkNothingWritten(partLoc); - txnBatch2.commit(); + connection2.commitTransaction(); String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; checkDataWritten2(partLoc, 11, 20, 1, validationQuery, true, "3\tHello streaming - once again"); - txnBatch1.commit(); + connection.commitTransaction(); /*now both batches have committed (but not closed) so we for each primary file we expect a side file to exist and indicate the true length of primary file*/ FileSystem fs = partLoc.getFileSystem(conf); AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, - msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); - for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { - for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { + msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); + for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { + for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile)); long lengthFileSize = fs.getFileStatus(lengthFile).getLen(); @@ -1244,20 +1530,20 @@ public class TestStreaming { } } checkDataWritten2(partLoc, 1, 20, 2, - validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again"); + validationQuery, false, "1\tHello streaming", "3\tHello streaming - once again"); - txnBatch1.beginNextTransaction(); - txnBatch1.write("2,Welcome to streaming".getBytes()); + connection.beginTransaction(); + connection.write("2,Welcome to streaming".getBytes()); - txnBatch2.beginNextTransaction(); - txnBatch2.write("4,Welcome to streaming - once again".getBytes()); + connection2.beginTransaction(); + connection2.write("4,Welcome to streaming - once again".getBytes()); //here each batch has written data and committed (to bucket0 since table only has 1 bucket) //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 //has now received more data(logically - it's buffered) but it is not yet committed. //lets check that side files exist, etc dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); - for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { - for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { + for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { + for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile)); long lengthFileSize = fs.getFileStatus(lengthFile).getLen(); @@ -1269,103 +1555,97 @@ public class TestStreaming { } } checkDataWritten2(partLoc, 1, 20, 2, - validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again"); + validationQuery, true, "1\tHello streaming", "3\tHello streaming - once again"); - txnBatch1.commit(); + connection.commitTransaction(); checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming", - "2\tWelcome to streaming", - "3\tHello streaming - once again"); + "2\tWelcome to streaming", + "3\tHello streaming - once again"); - txnBatch2.commit(); + connection2.commitTransaction(); checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming", - "2\tWelcome to streaming", - "3\tHello streaming - once again", - "4\tWelcome to streaming - once again"); - - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch1.getCurrentTransactionState()); - Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch2.getCurrentTransactionState()); + "2\tWelcome to streaming", + "3\tHello streaming - once again", + "4\tWelcome to streaming - once again"); - txnBatch1.close(); - txnBatch2.close(); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED + , connection2.getCurrentTransactionState()); connection.close(); + connection2.close(); } private static class WriterThd extends Thread { private final StreamingConnection conn; - private final DelimitedInputWriter writer; private final String data; private Throwable error; - WriterThd(HiveEndPoint ep, String data) throws Exception { + WriterThd(String data) throws Exception { super("Writer_" + data); - writer = new DelimitedInputWriter(fieldNames, ",", ep); - conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName()); + RecordWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(partitionVals) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + this.conn = connection; this.data = data; - setUncaughtExceptionHandler(new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread thread, Throwable throwable) { - error = throwable; - LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable); - } + setUncaughtExceptionHandler((thread, throwable) -> { + error = throwable; + LOG.error(connection.toTransactionString()); + LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable); }); } @Override public void run() { - TransactionBatch txnBatch = null; try { - txnBatch = conn.fetchTransactionBatch(10, writer); - while (txnBatch.remainingTransactions() > 0) { - txnBatch.beginNextTransaction(); - txnBatch.write(data.getBytes()); - txnBatch.write(data.getBytes()); - txnBatch.commit(); + for (int i = 0; i < 10; i++) { + conn.beginTransaction(); + conn.write(data.getBytes()); + conn.write(data.getBytes()); + conn.commitTransaction(); } // while } catch (Exception e) { throw new RuntimeException(e); } finally { - if (txnBatch != null) { + if (conn != null) { try { - txnBatch.close(); + conn.close(); } catch (Exception e) { LOG.error("txnBatch.close() failed: " + e.getMessage(), e); - conn.close(); } } - try { - conn.close(); - } catch (Exception e) { - LOG.error("conn.close() failed: " + e.getMessage(), e); - } - } } } @Test public void testConcurrentTransactionBatchCommits() throws Exception { - final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); List<WriterThd> writers = new ArrayList<WriterThd>(3); - writers.add(new WriterThd(ep, "1,Matrix")); - writers.add(new WriterThd(ep, "2,Gandhi")); - writers.add(new WriterThd(ep, "3,Silence")); + writers.add(new WriterThd("1,Matrix")); + writers.add(new WriterThd("2,Gandhi")); + writers.add(new WriterThd("3,Silence")); - for(WriterThd w : writers) { + for (WriterThd w : writers) { w.start(); } - for(WriterThd w : writers) { + for (WriterThd w : writers) { w.join(); } - for(WriterThd w : writers) { - if(w.error != null) { + for (WriterThd w : writers) { + if (w.error != null) { Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() + " See log file for stack trace", true); } @@ -1376,11 +1656,11 @@ public class TestStreaming { private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException { org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration()); Reader reader = OrcFile.createReader(orcFile, - OrcFile.readerOptions(conf).filesystem(fs)); + OrcFile.readerOptions(conf).filesystem(fs)); RecordReader rows = reader.rows(); StructObjectInspector inspector = (StructObjectInspector) reader - .getObjectInspector(); + .getObjectInspector(); System.out.format("Found Bucket File : %s \n", orcFile.getName()); ArrayList<SampleRec> result = new ArrayList<SampleRec>(); @@ -1402,7 +1682,7 @@ public class TestStreaming { WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector(); WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector(); WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector(); - WritableLongObjectInspector f4ins = (WritableLongObjectInspector) fields.get(4).getFieldObjectInspector(); + WritableLongObjectInspector f4ins = (WritableLongObjectInspector) fields.get(4).getFieldObjectInspector(); StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector(); int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0))); @@ -1412,7 +1692,7 @@ public class TestStreaming { long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4))); SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins); - return new Object[] {f0, f1, f2, f3, f4, f5}; + return new Object[]{f0, f1, f2, f3, f4, f5}; } // Assumes row schema => string,int,string @@ -1437,49 +1717,67 @@ public class TestStreaming { // 1) Create two bucketed tables String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; - dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths String[] colNames = "key1,key2,data".split(","); String[] colTypes = "string,int,string".split(","); String[] bucketNames = "key1,key2".split(","); int bucketCount = 4; createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames - , null, dbLocation, bucketCount); + , null, dbLocation, bucketCount); String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db"; - dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths + dbLocation2 = dbLocation2.replaceAll("\\\\", "/"); // for windows paths String[] colNames2 = "key3,key4,data2".split(","); String[] colTypes2 = "string,int,string".split(","); String[] bucketNames2 = "key3,key4".split(","); createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2 - , null, dbLocation2, bucketCount); + , null, dbLocation2, bucketCount); // 2) Insert data into both tables - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, db
<TRUNCATED>