This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4fb7717f9d6 HIVE-28351: TestStreaming to run on Tez (#6145)
4fb7717f9d6 is described below
commit 4fb7717f9d636de86247da52c1ef5f4dfffe4964
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Oct 23 00:35:28 2025 +0200
HIVE-28351: TestStreaming to run on Tez (#6145)
---
.../org/apache/hive/streaming/TestStreaming.java | 761 +++++++++------------
1 file changed, 336 insertions(+), 425 deletions(-)
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 5ee780005a6..3177a171283 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -18,12 +18,9 @@
package org.apache.hive.streaming;
-import static
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -39,6 +36,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,6 +55,7 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -71,23 +70,18 @@
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
-import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -100,29 +94,22 @@
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Ignore("HIVE-24219")
public class TestStreaming {
private static final Logger LOG =
LoggerFactory.getLogger(TestStreaming.class);
- public static class RawFileSystem extends RawLocalFileSystem {
+ private static final class RawFileSystem extends RawLocalFileSystem {
private static final URI NAME;
static {
@@ -147,7 +134,7 @@ public String getScheme() {
public FileStatus getFileStatus(Path path) throws IOException {
File file = pathToFile(path);
if (!file.exists()) {
- throw new FileNotFoundException("Can'table find " + path);
+ throw new FileNotFoundException("Can't find " + path);
}
// get close enough
short mod = 0;
@@ -177,16 +164,12 @@ public FileStatus getFileStatus(Path path) throws
IOException {
// partitioned table
private final static String dbName = "testing";
private final static String tblName = "alerts";
- private final static String[] fieldNames = new String[]{COL1, COL2};
static List<String> partitionVals;
private static Path partLoc;
- private static Path partLoc2;
// unpartitioned table
private final static String dbName2 = "testing2";
private final static String tblName2 = "alerts";
- private final static String[] fieldNames2 = new String[]{COL1, COL2};
-
// for bucket join testing
private final static String dbName3 = "testing3";
@@ -195,38 +178,39 @@ public FileStatus getFileStatus(Path path) throws
IOException {
private final static String tblName4 = "factTable";
List<String> partitionVals2;
-
private final String PART1_CONTINENT = "Asia";
private final String PART1_COUNTRY = "India";
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
-
public TestStreaming() throws Exception {
- partitionVals = new ArrayList<String>(2);
+ partitionVals = new ArrayList<>(2);
partitionVals.add(PART1_CONTINENT);
partitionVals.add(PART1_COUNTRY);
- partitionVals2 = new ArrayList<String>(1);
+ partitionVals2 = new ArrayList<>(1);
partitionVals2.add(PART1_COUNTRY);
+ conf = new HiveConfForTest(this.getClass());
+ // Multiple tests requires more than one buckets per write. Use a very
small value for grouping size to create
+ // multiple mapper instances with FileSinkOperators. The number of buckets
are depends on the size of the data
+ // written and the grouping size. Most test cases expects 2 buckets.
+ conf.set("tez.grouping.max-size", "10");
+ conf.set("tez.grouping.min-size", "1");
- conf = new HiveConf(this.getClass());
conf.set("fs.raw.impl", RawFileSystem.class.getName());
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- TestTxnDbUtil.setConfValues(conf);
conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+
dbFolder.create();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, "raw://" +
dbFolder.newFolder("warehouse"));
- //1) Start from a clean slate (metastore)
- TestTxnDbUtil.cleanDb(conf);
+ TestTxnDbUtil.setConfValues(conf);
TestTxnDbUtil.prepDb(conf);
- //2) obtain metastore clients
msClient = new HiveMetaStoreClient(conf);
}
@@ -235,6 +219,7 @@ public void setup() throws Exception {
SessionState.start(new CliSessionState(conf));
driver = DriverFactory.newDriver(conf);
driver.setMaxRows(200002);//make sure Driver returns all results
+
// drop and recreate the necessary databases and tables
dropDB(msClient, dbName);
@@ -248,34 +233,39 @@ public void setup() throws Exception {
dropDB(msClient, dbName2);
String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
- partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames,
colTypes, bucketCols, null, loc2, 2);
+ createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes,
bucketCols, null, loc2, 2);
+ dropDB(msClient, "testing5");
String loc3 = dbFolder.newFolder("testing5.db").toString();
createStoreSales("testing5", loc3);
- runDDL(driver, "drop table testBucketing3.streamedtable");
- runDDL(driver, "drop table testBucketing3.finaltable");
- runDDL(driver, "drop table testBucketing3.nobucket");
+ dropTables(driver,
+ "testBucketing3.streamedtable",
+ "testBucketing3.finaltable",
+ "testBucketing3.nobucket");
}
@After
- public void cleanup() {
- msClient.close();
- driver.close();
+ public void cleanup() throws Exception {
+ try {
+ msClient.close();
+ driver.close();
+ } finally {
+ TestTxnDbUtil.cleanDb(conf);
+ }
}
- private void createStoreSales(String dbName, String loc) throws Exception {
+ private void createStoreSales(String dbName, String loc) {
String dbUri = "raw://" + new Path(loc).toUri().toString();
String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
- boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName
+ " location '" + dbUri + "'");
+ boolean success = runQuery(driver, "create database IF NOT EXISTS " +
dbName + " location '" + dbUri + "'");
Assert.assertTrue(success);
- success = runDDL(driver, "use " + dbName);
+ success = runQuery(driver, "use " + dbName);
Assert.assertTrue(success);
- success = runDDL(driver, "drop table if exists store_sales");
- Assert.assertTrue(success);
- success = runDDL(driver, "create table store_sales\n" +
+ dropTables(driver, "store_sales");
+ success = runQuery(driver, "create table store_sales\n" +
"(\n" +
" ss_sold_date_sk int,\n" +
" ss_sold_time_sk int,\n" +
@@ -307,26 +297,24 @@ private void createStoreSales(String dbName, String loc)
throws Exception {
" TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
Assert.assertTrue(success);
- success = runDDL(driver, "alter table store_sales add
partition(dt='2015')");
+ success = runQuery(driver, "alter table store_sales add
partition(dt='2015')");
Assert.assertTrue(success);
}
/**
* make sure it works with table where bucket col is not 1st col
- *
- * @throws Exception
*/
@Test
public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
- List<String> partitionVals = new ArrayList<String>();
- partitionVals.add("2015");
+ List<String> newPartVals = new ArrayList<>();
+ newPartVals.add("2015");
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("testing5")
.withTable("store_sales")
- .withStaticPartitionValues(partitionVals)
+ .withStaticPartitionValues(newPartVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
@@ -347,7 +335,7 @@ public void testBucketingWhereBucketColIsNotFirstCol()
throws Exception {
connection.commitTransaction();
connection.close();
- ArrayList<String> res = queryTable(driver, "select row__id.bucketid, *
from testing5.store_sales");
+ List<String> res = queryTable(driver, "select row__id.bucketid, * from
testing5.store_sales");
for (String re : res) {
System.out.println(re);
}
@@ -358,13 +346,13 @@ public void testBucketingWhereBucketColIsNotFirstCol()
throws Exception {
*/
@Test
public void testNoBuckets() throws Exception {
- queryTable(driver, "drop table if exists default.streamingnobuckets");
- queryTable(driver, "create table default.streamingnobuckets (a string, b
string) stored as orc " +
+ dropTables(driver, "default.streamingnobuckets");
+ runQuery(driver, "create table default.streamingnobuckets (a string, b
string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
- queryTable(driver, "insert into default.streamingnobuckets
values('foo','bar')");
+ runQuery(driver, "insert into default.streamingnobuckets
values('foo','bar')");
List<String> rs = queryTable(driver, "select * from
default.streamingnobuckets");
Assert.assertEquals(1, rs.size());
- Assert.assertEquals("foo\tbar", rs.get(0));
+ Assert.assertEquals("foo\tbar", rs.getFirst());
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
@@ -382,9 +370,9 @@ public void testNoBuckets() throws Exception {
connection.write("a3,b4".getBytes());
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
- Assert.assertEquals(resp.getLocksSize(), 1);
- Assert.assertEquals("streamingnobuckets",
resp.getLocks().get(0).getTablename());
- Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
+ Assert.assertEquals(1, resp.getLocksSize());
+ Assert.assertEquals("streamingnobuckets",
resp.getLocks().getFirst().getTablename());
+ Assert.assertEquals("default", resp.getLocks().getFirst().getDbname());
connection.commitTransaction();
connection.beginTransaction();
connection.write("a5,b6".getBytes());
@@ -406,27 +394,27 @@ public void testNoBuckets() throws Exception {
Assert.assertTrue(rs.get(4),
rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
Assert.assertTrue(rs.get(4),
rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
- queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where
a='a7'");
- queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+ runQuery(driver, "update default.streamingnobuckets set a=0, b=0 where
a='a7'");
+ runQuery(driver, "delete from default.streamingnobuckets where a='a1'");
rs = queryTable(driver, "select a, b from default.streamingnobuckets order
by a, b");
- int row = 0;
- Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
- Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
- Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
- Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
- queryTable(driver, "alter table default.streamingnobuckets compact
'major'");
+ Assert.assertEquals("at row=" + 0, "0\t0", rs.get(0));
+ Assert.assertEquals("at row=" + 1, "a3\tb4", rs.get(1));
+ Assert.assertEquals("at row=" + 2, "a5\tb6", rs.get(2));
+ Assert.assertEquals("at row=" + 3, "foo\tbar", rs.get(3));
+
+ runQuery(driver, "alter table default.streamingnobuckets compact 'major'");
runWorker(conf);
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from
default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0),
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
- Assert.assertTrue(rs.get(0),
rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+ Assert.assertTrue(rs.get(0),
rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
Assert.assertTrue(rs.get(1),
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
- Assert.assertTrue(rs.get(1),
rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+ Assert.assertTrue(rs.get(1),
rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
Assert.assertTrue(rs.get(2),
rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
- Assert.assertTrue(rs.get(2),
rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+ Assert.assertTrue(rs.get(2),
rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
Assert.assertTrue(rs.get(3),
rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870913,\"rowid\":0}\t0\t0"));
- Assert.assertTrue(rs.get(3),
rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+ Assert.assertTrue(rs.get(3),
rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
}
@Test
@@ -448,14 +436,14 @@ public void testGetDeltaPath() throws Exception {
@Test
public void testCommitWithKeyValue() throws Exception {
- queryTable(driver, "drop table if exists default.keyvalue");
- queryTable(driver, "create table default.keyvalue (a string, b string)
stored as orc " +
+ dropTables(driver, "default.keyvalue");
+ runQuery(driver, "create table default.keyvalue (a string, b string)
stored as orc " +
"TBLPROPERTIES('transactional'='true')");
- queryTable(driver, "insert into default.keyvalue values('foo','bar')");
- queryTable(driver, "ALTER TABLE default.keyvalue SET
TBLPROPERTIES('_metamykey' = 'myvalue')");
+ runQuery(driver, "insert into default.keyvalue values('foo','bar')");
+ runQuery(driver, "ALTER TABLE default.keyvalue SET
TBLPROPERTIES('_metamykey' = 'myvalue')");
List<String> rs = queryTable(driver, "select * from default.keyvalue");
Assert.assertEquals(1, rs.size());
- Assert.assertEquals("foo\tbar", rs.get(0));
+ Assert.assertEquals("foo\tbar", rs.getFirst());
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
@@ -485,14 +473,14 @@ public void testCommitWithKeyValue() throws Exception {
@Test
public void testConnectionWithWriteId() throws Exception {
- queryTable(driver, "drop table if exists default.writeidconnection");
- queryTable(driver, "create table default.writeidconnection (a string, b
string) stored as orc " +
+ dropTables(driver, "default.writeidconnection");
+ runQuery(driver, "create table default.writeidconnection (a string, b
string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
- queryTable(driver, "insert into default.writeidconnection
values('a0','bar')");
+ runQuery(driver, "insert into default.writeidconnection
values('a0','bar')");
List<String> rs = queryTable(driver, "select * from
default.writeidconnection");
Assert.assertEquals(1, rs.size());
- Assert.assertEquals("a0\tbar", rs.get(0));
+ Assert.assertEquals("a0\tbar", rs.getFirst());
StrictDelimitedInputWriter writerT =
StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
@@ -556,8 +544,7 @@ public void testConnectionWithWriteId() throws Exception {
connectionOne.beginTransaction();
Assert.fail("second beginTransaction should have thrown a "
+ "StreamingException");
- } catch (StreamingException e) {
-
+ } catch (StreamingException ignored) {
}
connectionOne.close();
@@ -585,8 +572,8 @@ public void testConnectionWithWriteId() throws Exception {
@Test
public void testAllTypesDelimitedWriter() throws Exception {
- queryTable(driver, "drop table if exists default.alltypes");
- queryTable(driver,
+ dropTables(driver, "default.alltypes");
+ runQuery(driver,
"create table if not exists default.alltypes ( bo boolean, ti tinyint,
si smallint, i int, bi bigint, " +
"f float, d double, de decimal(10,3), ts timestamp, da date, s string,
c char(5), vc varchar(5), " +
"m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) "
+
@@ -618,7 +605,7 @@ public void testAllTypesDelimitedWriter() throws Exception {
List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi,
f, d, de, ts, da, s, c, vc, m, l, st," +
" INPUT__FILE__NAME from default.alltypes order by ROW__ID");
Assert.assertEquals(2, rs.size());
- String gotRow1 = rs.get(0);
+ String gotRow1 = rs.getFirst();
String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
"\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31
15:59:58.174\t1970-01-01\tstring" +
"\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
@@ -636,8 +623,8 @@ public void testAllTypesDelimitedWriter() throws Exception {
@Test
public void testAllTypesDelimitedWriterInputStream() throws Exception {
- queryTable(driver, "drop table if exists default.alltypes");
- queryTable(driver,
+ dropTables(driver, "default.alltypes");
+ runQuery(driver,
"create table if not exists default.alltypes ( bo boolean, ti tinyint,
si smallint, i int, bi bigint, " +
"f float, d double, de decimal(10,3), ts timestamp, da date, s string,
c char(5), vc varchar(5), " +
"m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) "
+
@@ -672,7 +659,7 @@ public void testAllTypesDelimitedWriterInputStream() throws
Exception {
List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi,
f, d, de, ts, da, s, c, vc, m, l, st," +
" INPUT__FILE__NAME from default.alltypes order by ROW__ID");
Assert.assertEquals(2, rs.size());
- String gotRow1 = rs.get(0);
+ String gotRow1 = rs.getFirst();
String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
"\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31
15:59:58.174\t1970-01-01\tstring" +
"\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
@@ -690,13 +677,13 @@ public void testAllTypesDelimitedWriterInputStream()
throws Exception {
@Test
public void testAutoRollTransactionBatch() throws Exception {
- queryTable(driver, "drop table if exists default.streamingnobuckets");
- queryTable(driver, "create table default.streamingnobuckets (a string, b
string) stored as orc " +
+ dropTables(driver, "default.streamingnobuckets");
+ runQuery(driver, "create table default.streamingnobuckets (a string, b
string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
- queryTable(driver, "insert into default.streamingnobuckets
values('foo','bar')");
+ runQuery(driver, "insert into default.streamingnobuckets
values('foo','bar')");
List<String> rs = queryTable(driver, "select * from
default.streamingnobuckets");
Assert.assertEquals(1, rs.size());
- Assert.assertEquals("foo\tbar", rs.get(0));
+ Assert.assertEquals("foo\tbar", rs.getFirst());
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
@@ -751,36 +738,36 @@ public void testAutoRollTransactionBatch() throws
Exception {
Assert.assertTrue(rs.get(8),
rs.get(8).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\ta15\tb16"));
Assert.assertTrue(rs.get(8),
rs.get(8).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
- queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where
a='a7'");
- queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
- queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where
a='a15'");
- queryTable(driver, "delete from default.streamingnobuckets where a='a9'");
+ runQuery(driver, "update default.streamingnobuckets set a=0, b=0 where
a='a7'");
+ runQuery(driver, "delete from default.streamingnobuckets where a='a1'");
+ runQuery(driver, "update default.streamingnobuckets set a=0, b=0 where
a='a15'");
+ runQuery(driver, "delete from default.streamingnobuckets where a='a9'");
rs = queryTable(driver, "select a, b from default.streamingnobuckets order
by a, b");
- int row = 0;
- Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
- Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
- Assert.assertEquals("at row=" + row, "a11\tb12", rs.get(row++));
- Assert.assertEquals("at row=" + row, "a13\tb14", rs.get(row++));
- Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
- Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
- Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
-
- queryTable(driver, "alter table default.streamingnobuckets compact
'major'");
+
+ Assert.assertEquals("at row=" + 0, "0\t0", rs.get(0));
+ Assert.assertEquals("at row=" + 1, "0\t0", rs.get(1));
+ Assert.assertEquals("at row=" + 2, "a11\tb12", rs.get(2));
+ Assert.assertEquals("at row=" + 3, "a13\tb14", rs.get(3));
+ Assert.assertEquals("at row=" + 4, "a3\tb4", rs.get(4));
+ Assert.assertEquals("at row=" + 5, "a5\tb6", rs.get(5));
+ Assert.assertEquals("at row=" + 6, "foo\tbar", rs.get(6));
+
+ runQuery(driver, "alter table default.streamingnobuckets compact 'major'");
runWorker(conf);
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from
default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0),
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
- Assert.assertTrue(rs.get(0),
rs.get(0).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+ Assert.assertTrue(rs.get(0),
rs.get(0).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
Assert.assertTrue(rs.get(1),
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
- Assert.assertTrue(rs.get(1),
rs.get(1).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+ Assert.assertTrue(rs.get(1),
rs.get(1).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
Assert.assertTrue(rs.get(2),
rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
- Assert.assertTrue(rs.get(2),
rs.get(2).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+ Assert.assertTrue(rs.get(2),
rs.get(2).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
Assert.assertTrue(rs.get(3),
rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
- Assert.assertTrue(rs.get(3),
rs.get(3).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+ Assert.assertTrue(rs.get(3),
rs.get(3).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
Assert.assertTrue(rs.get(4),
rs.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
- Assert.assertTrue(rs.get(4),
rs.get(4).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+ Assert.assertTrue(rs.get(4),
rs.get(4).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
Assert.assertTrue(rs.get(5),
rs.get(5).startsWith("{\"writeid\":6,\"bucketid\":536936449,\"rowid\":0}\t0\t0"));
- Assert.assertTrue(rs.get(5),
rs.get(5).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00001"));
+ Assert.assertTrue(rs.get(5),
rs.get(5).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00001"));
}
/**
@@ -805,19 +792,19 @@ public void testStreamBucketingMatchesRegularBucketing()
throws Exception {
String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
- // disabling vectorization as this test yields incorrect results with
vectorization
- conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
try (IDriver driver = DriverFactory.newDriver(conf)) {
- runDDL(driver, "create database testBucketing3");
- runDDL(driver, "use testBucketing3");
- runDDL(driver, "create table streamedtable ( key1 string,key2 int,data
string ) clustered by ( key1,key2 ) into "
- + bucketCount + " buckets stored as orc location " + tableLoc + "
TBLPROPERTIES ('transactional'='true')");
- // In 'nobucket' table we capture bucketid from streamedtable to
workaround a hive bug that prevents joins two identically bucketed tables
- runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2
int,data string ) location " + tableLoc3);
- runDDL(driver,
- "create table finaltable ( bucketid int, key1 string,key2 int,data
string ) clustered by ( key1,key2 ) into "
- + bucketCount + " buckets stored as orc location " + tableLoc2 + "
TBLPROPERTIES ('transactional'='true')");
-
+ runQuery(driver, "create database testBucketing3");
+ runQuery(driver, "use testBucketing3");
+ runQuery(driver, "create table streamedtable (key1 string, key2 int,
data string)"
+ + " clustered by (key1, key2) into " + bucketCount + " buckets"
+ + " stored as orc location " + tableLoc + " TBLPROPERTIES
('transactional'='true')");
+ // In 'nobucket' table we capture bucketid from streamedtable to
workaround a hive bug
+ // that prevents joins two identically bucketed tables
+ runQuery(driver, "create table nobucket (bucketid int, key1 string, key2
int, data string)"
+ + " location " + tableLoc3);
+ runQuery(driver, "create table finaltable (bucketid int, key1 string,
key2 int, data string)"
+ + " clustered by (key1, key2) into " + bucketCount + " buckets "
+ + " stored as orc location " + tableLoc2 + " TBLPROPERTIES
('transactional'='true')");
String[] records = new String[]{
"PSFAHYLZVC,29,EPNMA",
@@ -862,25 +849,22 @@ public void testStreamBucketingMatchesRegularBucketing()
throws Exception {
connection.commitTransaction();
connection.close();
- ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, *
from streamedtable order by key2");
+ List<String> res1 = queryTable(driver, "select row__id.bucketid, * from
streamedtable order by key2");
for (String re : res1) {
LOG.error(re);
}
driver.run("insert into nobucket select row__id.bucketid,* from
streamedtable");
- runDDL(driver, "insert into finaltable select * from nobucket");
- ArrayList<String> res2 = queryTable(driver,
+ runQuery(driver, "insert into finaltable select * from nobucket");
+ List<String> res2 = queryTable(driver,
"select row__id.bucketid,* from finaltable where
row__id.bucketid<>bucketid");
for (String s : res2) {
LOG.error(s);
}
Assert.assertTrue(res2.isEmpty());
- } finally {
- conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
}
}
-
@Test
public void testTableValidation() throws Exception {
int bucketCount = 100;
@@ -892,13 +876,13 @@ public void testTableValidation() throws Exception {
String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
- runDDL(driver, "create database testBucketing3");
- runDDL(driver, "use testBucketing3");
+ runQuery(driver, "create database testBucketing3");
+ runQuery(driver, "use testBucketing3");
- runDDL(driver, "create table " + tbl1 + " ( key1 string, data string )
clustered by ( key1 ) into "
+ runQuery(driver, "create table " + tbl1 + " ( key1 string, data string )
clustered by ( key1 ) into "
+ bucketCount + " buckets stored as orc location " + tableLoc + "
TBLPROPERTIES ('transactional'='false')");
- runDDL(driver, "create table " + tbl2 + " ( key1 string, data string )
clustered by ( key1 ) into "
+ runQuery(driver, "create table " + tbl2 + " ( key1 string, data string )
clustered by ( key1 ) into "
+ bucketCount + " buckets stored as orc location " + tableLoc2 + "
TBLPROPERTIES ('transactional'='false')");
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
@@ -913,7 +897,7 @@ public void testTableValidation() throws Exception {
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
- Assert.assertTrue("InvalidTable exception was not thrown", false);
+ Assert.fail("InvalidTable exception was not thrown");
} catch (InvalidTable e) {
// expecting this exception
} finally {
@@ -929,7 +913,7 @@ public void testTableValidation() throws Exception {
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
- Assert.assertTrue("InvalidTable exception was not thrown", false);
+ Assert.fail("InvalidTable exception was not thrown");
} catch (InvalidTable e) {
// expecting this exception
} finally {
@@ -939,71 +923,14 @@ public void testTableValidation() throws Exception {
}
}
- /**
- * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String,
boolean, String...)} -
- * there is little value in using InputFormat directly
- */
- @Deprecated
- private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn,
int buckets, int numExpectedFiles,
- String... records) throws Exception {
- ValidWriteIdList writeIds = getTransactionContext(conf);
- AcidDirectory dir = AcidUtils.getAcidState(null, partitionPath, conf,
writeIds, null, false);
- Assert.assertEquals(0, dir.getObsolete().size());
- Assert.assertEquals(0, dir.getOriginalFiles().size());
- List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
- System.out.println("Files found: ");
- for (AcidUtils.ParsedDelta pd : current) {
- System.out.println(pd.getPath().toString());
- }
- Assert.assertEquals(numExpectedFiles, current.size());
-
- // find the absolute minimum transaction
- long min = Long.MAX_VALUE;
- long max = Long.MIN_VALUE;
- for (AcidUtils.ParsedDelta pd : current) {
- if (pd.getMaxWriteId() > max) {
- max = pd.getMaxWriteId();
- }
- if (pd.getMinWriteId() < min) {
- min = pd.getMinWriteId();
- }
- }
- // We are doing +1, as DDL operation will also advance the write Id now.
- Assert.assertEquals(minTxn + 1, min);
- Assert.assertEquals(maxTxn + 1, max);
-
- InputFormat inf = new OrcInputFormat();
- JobConf job = new JobConf();
- job.set("mapred.input.dir", partitionPath.toString());
- job.set(BUCKET_COUNT, Integer.toString(buckets));
- job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
- job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
- AcidUtils.setAcidOperationalProperties(job, true, null);
- job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
- job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.writeToString());
- job.set(ValidTxnList.VALID_TXNS_KEY,
conf.get(ValidTxnList.VALID_TXNS_KEY));
- InputSplit[] splits = inf.getSplits(job, buckets);
- Assert.assertEquals(numExpectedFiles, splits.length);
- org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
- inf.getRecordReader(splits[0], job, Reporter.NULL);
-
- NullWritable key = rr.createKey();
- OrcStruct value = rr.createValue();
- for (String record : records) {
- Assert.assertEquals(true, rr.next(key, value));
- Assert.assertEquals(record, value.toString());
- }
- Assert.assertEquals(false, rr.next(key, value));
- }
-
/**
* @param validationQuery query to read from table to compare data against
{@code records}
* @param records expected data. each row is CVS list of values
*/
- private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn,
int numExpectedFiles,
- String validationQuery, boolean vectorize, String... records) throws
Exception {
- AcidDirectory dir =
- AcidUtils.getAcidState(null, partitionPath, conf,
getTransactionContext(conf), null, false);
+ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn,
int numExpectedFiles,
+ String validationQuery, String... records) throws Exception {
+ ValidWriteIdList writeIds = getTransactionContext(conf);
+ AcidDirectory dir = AcidUtils.getAcidState(null, partitionPath, conf,
writeIds, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1024,13 +951,9 @@ private void checkDataWritten2(Path partitionPath, long
minTxn, long maxTxn, int
min = pd.getMinWriteId();
}
}
- // We are doing +1, as DDL operation will also advance the write Id now.
+ // We are doing +1, as DDL operation will also advance the writeId now.
Assert.assertEquals(minTxn + 1, min);
Assert.assertEquals(maxTxn + 1, max);
- boolean isVectorizationEnabled =
conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
- if (vectorize) {
- conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
- }
String currStrategy =
conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
for (String strategy : ((Validator.StringSet)
HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator())
@@ -1044,7 +967,6 @@ private void checkDataWritten2(Path partitionPath, long
minTxn, long maxTxn, int
}
}
conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED,
isVectorizationEnabled);
}
private ValidWriteIdList getTransactionContext(Configuration conf) throws
Exception {
@@ -1052,8 +974,9 @@ private ValidWriteIdList
getTransactionContext(Configuration conf) throws Except
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
List<TableValidWriteIds> v = msClient.getValidWriteIds(Collections
.singletonList(TableName.getDbTable(dbName, tblName)),
validTxnList.writeToString());
- return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
+ return TxnCommonUtils.createValidReaderWriteIdList(v.getFirst());
}
+
private void checkNothingWritten(Path partitionPath) throws Exception {
AcidDirectory dir =
AcidUtils.getAcidState(null, partitionPath, conf,
getTransactionContext(conf), null, false);
@@ -1099,7 +1022,7 @@ public void testEndpointConnection() throws Exception {
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
- Assert.assertTrue("ConnectionError was not thrown", false);
+ Assert.fail("ConnectionError was not thrown");
connection.close();
} catch (ConnectionError e) {
// expecting this exception
@@ -1110,7 +1033,7 @@ public void testEndpointConnection() throws Exception {
@Test
public void testAddPartition() throws Exception {
- List<String> newPartVals = new ArrayList<String>(2);
+ List<String> newPartVals = new ArrayList<>(2);
newPartVals.add(PART1_CONTINENT);
newPartVals.add("Nepal");
@@ -1140,7 +1063,7 @@ public void testAddPartition() throws Exception {
@Test
public void testAddPartitionWithWriteId() throws Exception {
- List<String> newPartVals = new ArrayList<String>(2);
+ List<String> newPartVals = new ArrayList<>(2);
newPartVals.add("WriteId_continent");
newPartVals.add("WriteId_country");
@@ -1190,7 +1113,8 @@ public void testAddPartitionWithWriteId() throws
Exception {
try {
msClient.getPartition(dbName, tblName, newPartVals);
Assert.fail("Partition shouldn't exist so a NoSuchObjectException should
have been raised");
- } catch (NoSuchObjectException e) {}
+ } catch (NoSuchObjectException ignored) {
+ }
transactionConnection.commitTransaction(partitions);
@@ -1201,8 +1125,8 @@ public void testAddPartitionWithWriteId() throws
Exception {
@Test
public void testAddDynamicPartitionWithWriteId() throws Exception {
- queryTable(driver, "drop table if exists default.writeiddynamic");
- queryTable(driver, "create table default.writeiddynamic (a"
+ dropTables(driver, "default.writeiddynamic");
+ runQuery(driver, "create table default.writeiddynamic (a"
+ " string, b string) partitioned by (c string, d string)"
+ " stored as orc TBLPROPERTIES('transactional'='true')");
@@ -1265,11 +1189,12 @@ public void testAddDynamicPartitionWithWriteId() throws
Exception {
msClient.getPartition("default", "writeiddynamic", partitionName);
Assert.fail(
"Partition shouldn't exist so a NoSuchObjectException should have
been raised");
- } catch (NoSuchObjectException e) {
+ } catch (NoSuchObjectException ignored) {
}
- partitionsOne.addAll(partitionsTwo);
- Set<String> allPartitions = partitionsOne;
+ Set<String> allPartitions = new HashSet<>(partitionsOne);
+ allPartitions.addAll(partitionsTwo);
+
transactionConnection.commitTransaction(allPartitions);
// Ensure partition is present
@@ -1351,8 +1276,6 @@ public void testTransactionBatchSizeValidation() throws
Exception {
/**
* check that transactions that have not heartbeated and timedout get
properly aborted
- *
- * @throws Exception
*/
@Test
public void testTimeOutReaper() throws Exception {
@@ -1366,7 +1289,7 @@ public void testTimeOutReaper() throws Exception {
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
-
+
HiveConf houseKeeperConf = new HiveConf(conf);
//ensure txn timesout
houseKeeperConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 100,
TimeUnit.MILLISECONDS);
@@ -1426,15 +1349,15 @@ public void testHeartbeat() throws Exception {
request.setTablename(tblName2);
ShowLocksResponse response = msClient.showLocks(request);
Assert.assertEquals("Wrong number of locks: " + response, 1,
response.getLocks().size());
- ShowLocksResponseElement lock = response.getLocks().get(0);
+ ShowLocksResponseElement lock = response.getLocks().getFirst();
long acquiredAt = lock.getAcquiredat();
long heartbeatAt = lock.getLastheartbeat();
response = msClient.showLocks(request);
Assert.assertEquals("Wrong number of locks2: " + response, 1,
response.getLocks().size());
- lock = response.getLocks().get(0);
+ lock = response.getLocks().getFirst();
Assert.assertEquals("Acquired timestamp didn'table match", acquiredAt,
lock.getAcquiredat());
- Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
- ") == old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat() ==
heartbeatAt);
+ Assert.assertEquals("Expected new heartbeat (" + lock.getLastheartbeat()
+
+ ") == old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat(),
heartbeatAt);
for (int i = 0; i < transactionBatch * 3; i++) {
connection.beginTransaction();
if (i % 10 == 0) {
@@ -1513,7 +1436,9 @@ public void testTransactionBatchCommitDelimited() throws
Exception {
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName +
" order by id, msg";
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
@@ -1525,12 +1450,14 @@ public void testTransactionBatchCommitDelimited()
throws Exception {
connection.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming");
connection.commitTransaction();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming");
connection.close();
@@ -1584,7 +1511,9 @@ public void testTransactionBatchCommitRegex() throws
Exception {
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName +
" order by id, msg";
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
@@ -1596,12 +1525,14 @@ public void testTransactionBatchCommitRegex() throws
Exception {
connection.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming");
connection.commitTransaction();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming");
connection.close();
Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
@@ -1686,7 +1617,9 @@ public void testTransactionBatchCommitJson() throws
Exception {
connection.write(rec1.getBytes());
connection.commitTransaction();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName +
" order by id, msg";
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
@@ -1826,31 +1759,29 @@ public void testTransactionBatchAbort() throws
Exception {
connection.close();
checkNothingWritten(partLoc);
-
}
@Test(expected = ClassCastException.class)
public void testFileSystemError() throws Exception {
// Bad file system object, ClassCastException should occur during record
writer init
- conf.set("fs.raw.impl", Object.class.getName());
+ conf.set("fs.raw.impl", TestStreaming.class.getName());
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
- .withFieldDelimiter(',')
- .build();
+ .withFieldDelimiter(',')
+ .build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
- .withDatabase(dbName)
- .withTable(tblName)
- .withStaticPartitionValues(partitionVals)
- .withAgentInfo("UT_" + Thread.currentThread().getName())
- .withRecordWriter(writer)
- .withHiveConf(conf)
- .connect();
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withStaticPartitionValues(partitionVals)
+ .withAgentInfo("UT_" + Thread.currentThread().getName())
+ .withRecordWriter(writer)
+ .withHiveConf(conf)
+ .connect();
connection.beginTransaction();
}
-
@Test
public void testTransactionBatchAbortAndCommit() throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
@@ -1873,9 +1804,9 @@ public void testTransactionBatchAbortAndCommit() throws
Exception {
connection.write("2,Welcome to streaming".getBytes());
ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest());
Assert.assertEquals("LockCount", 1, resp.getLocksSize());
- Assert.assertEquals("LockType", LockType.SHARED_READ,
resp.getLocks().get(0).getType());
- Assert.assertEquals("LockState", LockState.ACQUIRED,
resp.getLocks().get(0).getState());
- Assert.assertEquals("AgentInfo", agentInfo,
resp.getLocks().get(0).getAgentInfo());
+ Assert.assertEquals("LockType", LockType.SHARED_WRITE,
resp.getLocks().getFirst().getType());
+ Assert.assertEquals("LockState", LockState.ACQUIRED,
resp.getLocks().getFirst().getState());
+ Assert.assertEquals("AgentInfo", agentInfo,
resp.getLocks().getFirst().getAgentInfo());
connection.abortTransaction();
checkNothingWritten(partLoc);
@@ -1888,8 +1819,10 @@ public void testTransactionBatchAbortAndCommit() throws
Exception {
connection.write("2,Welcome to streaming".getBytes());
connection.commitTransaction();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName +
" order by id, msg";
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming");
connection.close();
}
@@ -1914,14 +1847,16 @@ public void testMultipleTransactionBatchCommits()
throws Exception {
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
String validationQuery = "select id, msg from " + dbName + "." + tblName +
" order by id, msg";
- checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello
streaming");
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming");
connection.beginTransaction();
connection.write("2,Welcome to streaming".getBytes());
connection.commitTransaction();
- checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello
streaming",
- "2\tWelcome to streaming");
+ checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming");
connection.close();
@@ -1939,16 +1874,20 @@ public void testMultipleTransactionBatchCommits()
throws Exception {
connection.write("3,Hello streaming - once again".getBytes());
connection.commitTransaction();
- checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello
streaming",
- "2\tWelcome to streaming", "3\tHello streaming - once again");
+ checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again");
connection.beginTransaction();
connection.write("4,Welcome to streaming - once again".getBytes());
connection.commitTransaction();
- checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello
streaming",
- "2\tWelcome to streaming", "3\tHello streaming - once again",
- "4\tWelcome to streaming - once again");
+ checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
@@ -1999,8 +1938,8 @@ public void testInterleavedTransactionBatchCommits()
throws Exception {
connection2.commitTransaction();
String validationQuery = "select id, msg from " + dbName + "." + tblName +
" order by id, msg";
- checkDataWritten2(partLoc, 11, 20, 1,
- validationQuery, true, "3\tHello streaming - once again");
+ checkDataWritten(partLoc, 11, 20, 1, validationQuery,
+ "3\tHello streaming - once again");
connection.commitTransaction();
/*now both batches have committed (but not closed) so we for each primary
file we expect a side
@@ -2016,11 +1955,12 @@ public void testInterleavedTransactionBatchCommits()
throws Exception {
lengthFileSize, lengthFileSize > 0);
long logicalLength = AcidUtils.getLogicalLength(fs, stat);
long actualLength = stat.getLen();
- Assert.assertTrue("", logicalLength == actualLength);
+ Assert.assertEquals("", logicalLength, actualLength);
}
}
- checkDataWritten2(partLoc, 1, 20, 2,
- validationQuery, false, "1\tHello streaming", "3\tHello streaming - once
again");
+ checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+ "1\tHello streaming",
+ "3\tHello streaming - once again");
connection.beginTransaction();
connection.write("2,Welcome to streaming".getBytes());
@@ -2030,7 +1970,7 @@ public void testInterleavedTransactionBatchCommits()
throws Exception {
//here each batch has written data and committed (to bucket0 since table
only has 1 bucket)
//so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.
Furthermore, each bucket0
//has now received more data(logically - it's buffered) but it is not yet
committed.
- //lets check that side files exist, etc
+ //let's check that side files exist, etc
dir = AcidUtils.getAcidState(fs, partLoc, conf,
getTransactionContext(conf), null, false);
for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for (FileStatus stat : fs.listStatus(pd.getPath(),
AcidUtils.bucketFileFilter)) {
@@ -2044,23 +1984,24 @@ public void testInterleavedTransactionBatchCommits()
throws Exception {
Assert.assertTrue("", logicalLength <= actualLength);
}
}
- checkDataWritten2(partLoc, 1, 20, 2,
- validationQuery, true, "1\tHello streaming", "3\tHello streaming - once
again");
+ checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+ "1\tHello streaming",
+ "3\tHello streaming - once again");
connection.commitTransaction();
- checkDataWritten2(partLoc, 1, 20, 2,
- validationQuery, false, "1\tHello streaming",
- "2\tWelcome to streaming",
- "3\tHello streaming - once again");
+ checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again");
connection2.commitTransaction();
- checkDataWritten2(partLoc, 1, 20, 2,
- validationQuery, true, "1\tHello streaming",
- "2\tWelcome to streaming",
- "3\tHello streaming - once again",
- "4\tWelcome to streaming - once again");
+ checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+ "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
@@ -2094,7 +2035,7 @@ private static class WriterThd extends Thread {
setUncaughtExceptionHandler((thread, throwable) -> {
error = throwable;
LOG.error(connection.toTransactionString());
- LOG.error("Thread " + thread.getName() + " died: " +
throwable.getMessage(), throwable);
+ LOG.error("Thread {} died: {}", thread.getName(),
throwable.getMessage(), throwable);
});
}
@@ -2114,7 +2055,7 @@ public void run() {
try {
conn.close();
} catch (Exception e) {
- LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
+ LOG.error("txnBatch.close() failed: {}", e.getMessage(), e);
}
}
}
@@ -2123,7 +2064,7 @@ public void run() {
@Test
public void testConcurrentTransactionBatchCommits() throws Exception {
- List<WriterThd> writers = new ArrayList<WriterThd>(3);
+ List<WriterThd> writers = new ArrayList<>(3);
writers.add(new WriterThd("1,Matrix"));
writers.add(new WriterThd("2,Gandhi"));
writers.add(new WriterThd("3,Silence"));
@@ -2136,16 +2077,15 @@ public void testConcurrentTransactionBatchCommits()
throws Exception {
}
for (WriterThd w : writers) {
if (w.error != null) {
- Assert.assertFalse("Writer thread" + w.getName() + " died: " +
w.error.getMessage() +
- " See log file for stack trace", true);
+ Assert.fail("Writer thread" + w.getName() + " died: " +
w.error.getMessage() +
+ " See log file for stack trace");
}
}
}
-
private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
org.apache.hadoop.fs.FileSystem fs =
org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
- ArrayList<SampleRec> result = new ArrayList<SampleRec>();
+ ArrayList<SampleRec> result = new ArrayList<>();
try (Reader reader = OrcFile.createReader(orcFile,
OrcFile.readerOptions(conf).filesystem(fs))) {
RecordReader rows = reader.rows();
StructObjectInspector inspector = (StructObjectInspector) reader
@@ -2222,7 +2162,6 @@ public void testBucketing() throws Exception {
createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2,
bucketNames2
, null, dbLocation2, bucketCount);
-
// 2) Insert data into both tables
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
@@ -2244,7 +2183,6 @@ public void testBucketing() throws Exception {
connection.commitTransaction();
connection.close();
-
StrictDelimitedInputWriter writer2 =
StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
@@ -2257,7 +2195,6 @@ public void testBucketing() throws Exception {
.withHiveConf(conf)
.connect();
-
connection2.beginTransaction();
connection2.write("name5,2,fact3".getBytes()); // bucket 0
@@ -2269,27 +2206,26 @@ public void testBucketing() throws Exception {
connection2.close();
// 3 Check data distribution in buckets
- HashMap<Integer, ArrayList<SampleRec>> actual1 =
dumpAllBuckets(dbLocation, tblName3);
- HashMap<Integer, ArrayList<SampleRec>> actual2 =
dumpAllBuckets(dbLocation2, tblName4);
+ Map<Integer, ArrayList<SampleRec>> actual1 = dumpAllBuckets(dbLocation,
tblName3);
+ Map<Integer, ArrayList<SampleRec>> actual2 = dumpAllBuckets(dbLocation2,
tblName4);
System.err.println("\n Table 1");
System.err.println(actual1);
System.err.println("\n Table 2");
System.err.println(actual2);
// assert bucket listing is as expected
- Assert.assertEquals("number of buckets does not match expectation",
actual1.values().size(), 3);
- Assert.assertTrue("bucket 0 shouldn't have been created", actual1.get(0)
== null);
- Assert.assertEquals("records in bucket does not match expectation",
actual1.get(1).size(), 1);
- Assert.assertEquals("records in bucket does not match expectation",
actual1.get(2).size(), 2);
- Assert.assertEquals("records in bucket does not match expectation",
actual1.get(3).size(), 1);
+ Assert.assertEquals("number of buckets does not match expectation", 3,
actual1.size());
+ Assert.assertNull("bucket 0 shouldn't have been created", actual1.get(0));
+ Assert.assertEquals("records in bucket does not match expectation", 1,
actual1.get(1).size());
+ Assert.assertEquals("records in bucket does not match expectation", 2,
actual1.get(2).size());
+ Assert.assertEquals("records in bucket does not match expectation", 1,
actual1.get(3).size());
}
private void runCmdOnDriver(String cmd) {
- boolean t = runDDL(driver, cmd);
+ boolean t = runQuery(driver, cmd);
Assert.assertTrue(cmd + " failed", t);
}
-
@Test
public void testFileDump() throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
@@ -2343,12 +2279,12 @@ public void testFileDump() throws Exception {
System.err.flush();
System.setErr(origErr);
- String errDump = new String(myErr.toByteArray());
- Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+ String errDump = myErr.toString();
+ Assert.assertFalse(errDump.contains("file(s) are corrupted"));
// since this test runs on local file system which does not have an API to
tell if files or
// open or not, we are testing for negative case even though the bucket
files are still open
// for writes (transaction batch not closed yet)
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
StrictDelimitedInputWriter writer2 =
StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
@@ -2382,10 +2318,10 @@ public void testFileDump() throws Exception {
System.err.flush();
System.setErr(origErr);
- errDump = new String(myErr.toByteArray());
- Assert.assertEquals(false, errDump.contains("Exception"));
- Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ errDump = myErr.toString();
+ Assert.assertFalse(errDump.contains("Exception"));
+ Assert.assertFalse(errDump.contains("file(s) are corrupted"));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
}
@Test
@@ -2442,23 +2378,23 @@ public void
testFileDumpDeltaFilesWithStreamingOptimizations() throws Exception
System.out.flush();
System.setOut(origOut);
- String outDump = new String(myOut.toByteArray());
+ String outDump = myOut.toString();
// make sure delta files are written with no indexes and no dictionary
- Assert.assertEquals(true, outDump.contains("Compression: ZLIB"));
+ Assert.assertTrue(outDump.contains("Compression: ZLIB"));
// no stats/indexes
- Assert.assertEquals(true, outDump.contains("Column 0: count: 0 hasNull:
false"));
- Assert.assertEquals(true, outDump.contains("Column 1: count: 0 hasNull:
false bytesOnDisk: 15 sum: 0"));
- Assert.assertEquals(true, outDump.contains("Column 2: count: 0 hasNull:
false bytesOnDisk: 15 sum: 0"));
- Assert.assertEquals(true, outDump.contains("Column 3: count: 0 hasNull:
false bytesOnDisk: 19 sum: 0"));
- Assert.assertEquals(true, outDump.contains("Column 4: count: 0 hasNull:
false bytesOnDisk: 17 sum: 0"));
- Assert.assertEquals(true, outDump.contains("Column 5: count: 0 hasNull:
false bytesOnDisk: 15 sum: 0"));
- Assert.assertEquals(true, outDump.contains("Column 6: count: 0 hasNull:
false"));
- Assert.assertEquals(true, outDump.contains("Column 7: count: 0 hasNull:
false bytesOnDisk: 3929"));
- Assert.assertEquals(true, outDump.contains("Column 8: count: 0 hasNull:
false bytesOnDisk: 1484 sum: 0"));
- Assert.assertEquals(true, outDump.contains("Column 9: count: 0 hasNull:
false bytesOnDisk: 816"));
+ Assert.assertTrue(outDump.contains("Column 0: count: 0 hasNull: false"));
+ Assert.assertTrue(outDump.contains("Column 1: count: 0 hasNull: false
bytesOnDisk: 15 sum: 0"));
+ Assert.assertTrue(outDump.contains("Column 2: count: 0 hasNull: false
bytesOnDisk: 15 sum: 0"));
+ Assert.assertTrue(outDump.contains("Column 3: count: 0 hasNull: false
bytesOnDisk: 19 sum: 0"));
+ Assert.assertTrue(outDump.contains("Column 4: count: 0 hasNull: false
bytesOnDisk: 17 sum: 0"));
+ Assert.assertTrue(outDump.contains("Column 5: count: 0 hasNull: false
bytesOnDisk: 15 sum: 0"));
+ Assert.assertTrue(outDump.contains("Column 6: count: 0 hasNull: false"));
+ Assert.assertTrue(outDump.contains("Column 7: count: 0 hasNull: false
bytesOnDisk: 3929"));
+ Assert.assertTrue(outDump.contains("Column 8: count: 0 hasNull: false
bytesOnDisk: 1484 sum: 0"));
+ Assert.assertTrue(outDump.contains("Column 9: count: 0 hasNull: false
bytesOnDisk: 816"));
// no dictionary
- Assert.assertEquals(true, outDump.contains("Encoding column 7:
DIRECT_V2"));
- Assert.assertEquals(true, outDump.contains("Encoding column 9:
DIRECT_V2"));
+ Assert.assertTrue(outDump.contains("Encoding column 7: DIRECT_V2"));
+ Assert.assertTrue(outDump.contains("Encoding column 9: DIRECT_V2"));
}
@Test
@@ -2514,9 +2450,9 @@ public void
testFileDumpDeltaFilesWithoutStreamingOptimizations() throws Excepti
System.out.flush();
System.setOut(origOut);
- String outDump = new String(myOut.toByteArray());
- Assert.assertEquals(true, outDump.contains("Compression: ZLIB"));
- Assert.assertEquals(true, outDump.contains("Encoding column 9:
DICTIONARY"));
+ String outDump = myOut.toString();
+ Assert.assertTrue(outDump.contains("Compression: ZLIB"));
+ Assert.assertTrue(outDump.contains("Encoding column 9: DICTIONARY"));
}
@Test
@@ -2579,10 +2515,10 @@ public void testFileDumpCorruptDataFiles() throws
Exception {
System.err.flush();
System.setErr(origErr);
- String errDump = new String(myErr.toByteArray());
- Assert.assertEquals(false, errDump.contains("Exception"));
- Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted"));
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ String errDump = myErr.toString();
+ Assert.assertFalse(errDump.contains("Exception"));
+ Assert.assertTrue(errDump.contains("3 file(s) are corrupted"));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
origErr = System.err;
myErr = new ByteArrayOutputStream();
@@ -2593,13 +2529,13 @@ public void testFileDumpCorruptDataFiles() throws
Exception {
System.err.flush();
System.setErr(origErr);
- errDump = new String(myErr.toByteArray());
- Assert.assertEquals(true, errDump.contains("bucket_00001 recovered
successfully!"));
- Assert.assertEquals(true, errDump.contains("No readable footers found.
Creating empty orc file."));
- Assert.assertEquals(true, errDump.contains("bucket_00002 recovered
successfully!"));
- Assert.assertEquals(true, errDump.contains("bucket_00003 recovered
successfully!"));
- Assert.assertEquals(false, errDump.contains("Exception"));
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ errDump = myErr.toString();
+ Assert.assertTrue(errDump.contains("bucket_00001 recovered
successfully!"));
+ Assert.assertTrue(errDump.contains("No readable footers found. Creating
empty orc file."));
+ Assert.assertTrue(errDump.contains("bucket_00002 recovered
successfully!"));
+ Assert.assertTrue(errDump.contains("bucket_00003 recovered
successfully!"));
+ Assert.assertFalse(errDump.contains("Exception"));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
// test after recovery
origErr = System.err;
@@ -2611,15 +2547,15 @@ public void testFileDumpCorruptDataFiles() throws
Exception {
System.err.flush();
System.setErr(origErr);
- errDump = new String(myErr.toByteArray());
- Assert.assertEquals(false, errDump.contains("Exception"));
- Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ errDump = myErr.toString();
+ Assert.assertFalse(errDump.contains("Exception"));
+ Assert.assertFalse(errDump.contains("file(s) are corrupted"));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
- // after recovery there shouldn'table be any *_flush_length files
+ // after recovery there shouldn't be any *_flush_length files
files = FileDump.getAllFilesInPath(path, conf);
for (String file : files) {
- Assert.assertEquals(false, file.contains("_flush_length"));
+ Assert.assertFalse(file.contains("_flush_length"));
}
connection.close();
@@ -2679,7 +2615,7 @@ public void testFileDumpCorruptSideFiles() throws
Exception {
connection.write("name6,3,aHello streaming".getBytes());
connection.commitTransaction();
- Map<String, List<Long>> offsetMap = new HashMap<String, List<Long>>();
+ Map<String, List<Long>> offsetMap = new HashMap<>();
recordOffsets(conf, dbLocation, offsetMap);
connection.beginTransaction();
@@ -2726,14 +2662,14 @@ public void testFileDumpCorruptSideFiles() throws
Exception {
System.err.flush();
System.setErr(origErr);
- String errDump = new String(myErr.toByteArray());
- Assert.assertEquals(true, errDump.contains("bucket_00000_flush_length
[length: 11"));
- Assert.assertEquals(true, errDump.contains("bucket_00001_flush_length
[length: 0"));
- Assert.assertEquals(true, errDump.contains("bucket_00002_flush_length
[length: 24"));
- Assert.assertEquals(true, errDump.contains("bucket_00003_flush_length
[length: 80"));
- Assert.assertEquals(false, errDump.contains("Exception"));
- Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted"));
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ String errDump = myErr.toString();
+ Assert.assertTrue(errDump.contains("bucket_00000_flush_length [length:
11"));
+ Assert.assertTrue(errDump.contains("bucket_00001_flush_length [length:
0"));
+ Assert.assertTrue(errDump.contains("bucket_00002_flush_length [length:
24"));
+ Assert.assertTrue(errDump.contains("bucket_00003_flush_length [length:
80"));
+ Assert.assertFalse(errDump.contains("Exception"));
+ Assert.assertTrue(errDump.contains("4 file(s) are corrupted"));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
origErr = System.err;
myErr = new ByteArrayOutputStream();
@@ -2744,21 +2680,21 @@ public void testFileDumpCorruptSideFiles() throws
Exception {
System.err.flush();
System.setErr(origErr);
- errDump = new String(myErr.toByteArray());
- Assert.assertEquals(true, errDump.contains("bucket_00000 recovered
successfully!"));
- Assert.assertEquals(true, errDump.contains("bucket_00001 recovered
successfully!"));
- Assert.assertEquals(true, errDump.contains("bucket_00002 recovered
successfully!"));
- Assert.assertEquals(true, errDump.contains("bucket_00003 recovered
successfully!"));
+ errDump = myErr.toString();
+ Assert.assertTrue(errDump.contains("bucket_00000 recovered
successfully!"));
+ Assert.assertTrue(errDump.contains("bucket_00001 recovered
successfully!"));
+ Assert.assertTrue(errDump.contains("bucket_00002 recovered
successfully!"));
+ Assert.assertTrue(errDump.contains("bucket_00003 recovered
successfully!"));
List<Long> offsets = offsetMap.get("bucket_00000");
- Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " +
offsets.toString()));
+ Assert.assertTrue(errDump.contains("Readable footerOffsets: " +
offsets.toString()));
offsets = offsetMap.get("bucket_00001");
- Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " +
offsets.toString()));
+ Assert.assertTrue(errDump.contains("Readable footerOffsets: " +
offsets.toString()));
offsets = offsetMap.get("bucket_00002");
- Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " +
offsets.toString()));
+ Assert.assertTrue(errDump.contains("Readable footerOffsets: " +
offsets.toString()));
offsets = offsetMap.get("bucket_00003");
- Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " +
offsets.toString()));
- Assert.assertEquals(false, errDump.contains("Exception"));
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ Assert.assertTrue(errDump.contains("Readable footerOffsets: " +
offsets.toString()));
+ Assert.assertFalse(errDump.contains("Exception"));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
// test after recovery
origErr = System.err;
@@ -2770,15 +2706,15 @@ public void testFileDumpCorruptSideFiles() throws
Exception {
System.err.flush();
System.setErr(origErr);
- errDump = new String(myErr.toByteArray());
- Assert.assertEquals(false, errDump.contains("Exception"));
- Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
- Assert.assertEquals(false, errDump.contains("is still open for writes."));
+ errDump = myErr.toString();
+ Assert.assertFalse(errDump.contains("Exception"));
+ Assert.assertFalse(errDump.contains("file(s) are corrupted"));
+ Assert.assertFalse(errDump.contains("is still open for writes."));
- // after recovery there shouldn'table be any *_flush_length files
+ // after recovery there shouldn't be any *_flush_length files
files = FileDump.getAllFilesInPath(path, conf);
for (String file : files) {
- Assert.assertEquals(false, file.contains("_flush_length"));
+ Assert.assertFalse(file.contains("_flush_length"));
}
connection.close();
@@ -2792,7 +2728,7 @@ private void corruptSideFile(final String file, final
HiveConf conf,
Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() +
".corrupt");
FileSystem fs = sideFilePath.getFileSystem(conf);
List<Long> offsets = offsetMap.get(key);
- long lastOffset = offsets.get(offsets.size() - 1);
+ long lastOffset = offsets.getLast();
FSDataOutputStream fdos = fs.create(cPath, true);
// corrupt last entry
if (numEntries < 0) {
@@ -2838,45 +2774,21 @@ private void recordOffsets(final HiveConf conf, final
String dbLocation,
long len = fileStatus.getLen();
if (file.contains("bucket_00000")) {
- if (offsetMap.containsKey("bucket_00000")) {
- List<Long> offsets = offsetMap.get("bucket_00000");
- offsets.add(len);
- offsetMap.put("bucket_00000", offsets);
- } else {
- List<Long> offsets = new ArrayList<Long>();
- offsets.add(len);
- offsetMap.put("bucket_00000", offsets);
- }
+ List<Long> offsets = offsetMap.computeIfAbsent("bucket_00000", k ->
new ArrayList<>());
+ offsets.add(len);
+ offsetMap.put("bucket_00000", offsets);
} else if (file.contains("bucket_00001")) {
- if (offsetMap.containsKey("bucket_00001")) {
- List<Long> offsets = offsetMap.get("bucket_00001");
- offsets.add(len);
- offsetMap.put("bucket_00001", offsets);
- } else {
- List<Long> offsets = new ArrayList<Long>();
- offsets.add(len);
- offsetMap.put("bucket_00001", offsets);
- }
+ List<Long> offsets = offsetMap.computeIfAbsent("bucket_00001", k ->
new ArrayList<>());
+ offsets.add(len);
+ offsetMap.put("bucket_00001", offsets);
} else if (file.contains("bucket_00002")) {
- if (offsetMap.containsKey("bucket_00002")) {
- List<Long> offsets = offsetMap.get("bucket_00002");
- offsets.add(len);
- offsetMap.put("bucket_00002", offsets);
- } else {
- List<Long> offsets = new ArrayList<Long>();
- offsets.add(len);
- offsetMap.put("bucket_00002", offsets);
- }
+ List<Long> offsets = offsetMap.computeIfAbsent("bucket_00002", k ->
new ArrayList<>());
+ offsets.add(len);
+ offsetMap.put("bucket_00002", offsets);
} else if (file.contains("bucket_00003")) {
- if (offsetMap.containsKey("bucket_00003")) {
- List<Long> offsets = offsetMap.get("bucket_00003");
- offsets.add(len);
- offsetMap.put("bucket_00003", offsets);
- } else {
- List<Long> offsets = new ArrayList<Long>();
- offsets.add(len);
- offsetMap.put("bucket_00003", offsets);
- }
+ List<Long> offsets = offsetMap.computeIfAbsent("bucket_00003", k ->
new ArrayList<>());
+ offsets.add(len);
+ offsetMap.put("bucket_00003", offsets);
}
}
}
@@ -2908,7 +2820,7 @@ public void testErrorHandling()
connection.close();
Exception expectedEx = null;
GetOpenTxnsInfoResponse r = msClient.showTxns();
- Assert.assertEquals("HWM didn'table match", 17,
r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 9, r.getTxn_high_water_mark());
List<TxnInfo> ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
@@ -2995,7 +2907,7 @@ public void testErrorHandling()
Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 11, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
@@ -3031,7 +2943,7 @@ public void testErrorHandling()
expectedEx != null && expectedEx.getMessage().contains("Simulated fault
occurred"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn'table match", 21,
r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 13, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(3)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
@@ -3043,20 +2955,17 @@ public void testErrorHandling()
// assumes un partitioned table
// returns a map<bucketNum, list<record> >
- private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String
dbLocation, String tableName)
- throws IOException {
- HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer,
ArrayList<SampleRec>>();
+ private Map<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation,
String tableName)
+ throws IOException {
+ Map<Integer, ArrayList<SampleRec>> result = new HashMap<>();
for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
if (!deltaDir.getName().startsWith("delta")) {
continue;
}
- File[] bucketFiles = deltaDir.listFiles(new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- String name = pathname.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
+ File[] bucketFiles = deltaDir.listFiles(pathname -> {
+ String name = pathname.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
});
for (File bucketFile : bucketFiles) {
if (bucketFile.toString().endsWith("length")) {
@@ -3074,7 +2983,7 @@ public boolean accept(File pathname) {
private Integer getBucketNumber(File bucketFile) {
String fname = bucketFile.getName();
int start = fname.indexOf('_');
- String number = fname.substring(start + 1, fname.length());
+ String number = fname.substring(start + 1);
return Integer.parseInt(number);
}
@@ -3085,12 +2994,10 @@ public static void dropDB(IMetaStoreClient client,
String databaseName) {
client.dropTable(databaseName, table, true, true);
}
client.dropDatabase(databaseName);
- } catch (TException e) {
+ } catch (TException ignored) {
}
-
}
-
///////// -------- UTILS ------- /////////
// returns Path of the partition created (if any) else Path of table
private static Path createDbAndTable(IDriver driver, String databaseName,
@@ -3103,8 +3010,8 @@ private static Path createDbAndTable(IDriver driver,
String databaseName,
String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
String tableLoc = dbUri + Path.SEPARATOR + tableName;
- runDDL(driver, "create database IF NOT EXISTS " + databaseName + "
location '" + dbUri + "'");
- runDDL(driver, "use " + databaseName);
+ runQuery(driver, "create database IF NOT EXISTS " + databaseName + "
location '" + dbUri + "'");
+ runQuery(driver, "use " + databaseName);
String crtTbl = "create table " + tableName +
" ( " + getTableColumnsStr(colNames, colTypes) + " )" +
getPartitionStmtStr(partNames) +
@@ -3113,7 +3020,7 @@ private static Path createDbAndTable(IDriver driver,
String databaseName,
" stored as orc " +
" location '" + tableLoc + "'" +
" TBLPROPERTIES ('transactional'='true') ";
- runDDL(driver, crtTbl);
+ runQuery(driver, crtTbl);
if (partNames != null && partNames.length != 0) {
return addPartition(driver, tableName, partVals, partNames);
}
@@ -3124,13 +3031,13 @@ private static Path addPartition(IDriver driver, String
tableName, List<String>
throws Exception {
String partSpec = getPartsSpec(partNames, partVals);
String addPart = "alter table " + tableName + " add partition ( " +
partSpec + " )";
- runDDL(driver, addPart);
+ runQuery(driver, addPart);
return getPartitionPath(driver, tableName, partSpec);
}
private static Path getPartitionPath(IDriver driver, String tableName,
String partSpec) throws Exception {
- ArrayList<String> res = queryTable(driver, "describe extended " +
tableName + " PARTITION (" + partSpec + ")");
- String partInfo = res.get(res.size() - 1);
+ List<String> res = queryTable(driver, "describe extended " + tableName + "
PARTITION (" + partSpec + ")");
+ String partInfo = res.getLast();
int start = partInfo.indexOf("location:") + "location:".length();
int end = partInfo.indexOf(",", start);
return new Path(partInfo.substring(start, end));
@@ -3201,27 +3108,33 @@ private static String getPartitionStmtStr(String[]
partNames) {
return " partitioned by (" + getTablePartsStr(partNames) + " )";
}
- private static boolean runDDL(IDriver driver, String sql) {
+ private static boolean runQuery(IDriver driver, String sql) {
LOG.debug(sql);
- System.out.println(sql);
- //LOG.debug("Running Hive Query: "+ sql);
try {
driver.run(sql);
return true;
} catch (CommandProcessorException e) {
- LOG.error("Statement: " + sql + " failed: " + e);
+ LOG.error("Statement: {} failed: ", sql, e);
return false;
}
}
+ private static void dropTables(IDriver driver, String... tables) {
+ HiveConf queryConf = driver.getQueryState().getConf();
+ queryConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ for (String table : tables) {
+ runQuery(driver, "drop table if exists " + table);
+ }
+ queryConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ }
- private static ArrayList<String> queryTable(IDriver driver, String query)
throws IOException {
+ private static List<String> queryTable(IDriver driver, String query) throws
IOException {
try {
driver.run(query);
} catch (CommandProcessorException e) {
throw new RuntimeException(query + " failed: " + e);
}
- ArrayList<String> res = new ArrayList<String>();
+ List<String> res = new ArrayList<>();
driver.getResults(res);
return res;
}
@@ -3251,10 +3164,10 @@ public boolean equals(Object o) {
if (field2 != that.field2) {
return false;
}
- if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) {
+ if (!Objects.equals(field1, that.field1)) {
return false;
}
- return !(field3 != null ? !field3.equals(that.field3) : that.field3 !=
null);
+ return Objects.equals(field3, that.field3);
}
@@ -3325,8 +3238,6 @@ public Set<String> getPartitions() {
/**
* allows testing of "unexpected" errors
- *
- * @throws StreamingIOFailure
*/
private void produceFault() throws StreamingIOFailure {
if (shouldThrow) {
@@ -3345,7 +3256,7 @@ void disableErrors() {
@Override
public Path getDeltaFileLocation(List<String> partitionValues,
Integer bucketId, Long minWriteId, Long maxWriteId, Integer
statementId,
- Table table) throws StreamingException {
+ Table table) {
return null;
}
}