http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java index a18b91e..35815fd 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.bolt; @@ -21,7 +15,23 @@ package org.apache.storm.hive.bolt; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.storm.Config; +import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper; +import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper; +import org.apache.storm.hive.common.HiveOptions; import org.apache.storm.hive.common.HiveWriter; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.task.OutputCollector; @@ -31,41 +41,21 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; import org.apache.storm.utils.MockTupleHelpers; - -import org.apache.storm.hive.common.HiveOptions; -import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper; -import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; - +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import static org.mockito.Mockito.*; - -import org.junit.Assert; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.HashMap; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.HashSet; -import java.text.SimpleDateFormat; - - -import org.apache.hive.hcatalog.streaming.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; public class TestHiveBolt { final static String dbName = "testdb"; @@ -74,87 +64,23 @@ public class TestHiveBolt { final static String tblName1 = "test_table1"; final static String PART1_NAME = "city"; final static String PART2_NAME = "state"; - final static String[] partNames = {PART1_NAME, PART2_NAME}; - final String partitionVals = "sunnyvale,ca"; + final static String[] partNames = { PART1_NAME, PART2_NAME }; private static final String COL1 = "id"; private static final String COL2 = "msg"; - final String[] colNames = {COL1, COL2}; - final String[] colNames1 = {COL2, COL1}; - private String[] colTypes = {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME}; - private final HiveConf conf; + private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class); + final String partitionVals = "sunnyvale,ca"; + final String[] colNames = { COL1, COL2 }; + final String[] colNames1 = { COL2, COL1 }; final String metaStoreURI; + private final HiveConf conf; + private String[] colTypes = { serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME }; private Config config = new Config(); private TestingHiveBolt bolt; - private ObjectMapper objectMapper = new ObjectMapper();; - + ; + private ObjectMapper objectMapper = new ObjectMapper(); @Mock private OutputCollector collector; - private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class); - - private static class TestingHiveBolt extends HiveBolt { - - protected Map<List<String>, List<byte[]>> partitionValuesToWrittenRecords = new HashMap<>(); - - public TestingHiveBolt(HiveOptions options) { - super(options); - } - - @Override - HiveWriter getOrCreateWriter(final HiveEndPoint endPoint) - throws HiveWriter.ConnectFailure, InterruptedException { - HiveWriter writer = allWriters.get(endPoint); - if (writer == null) { - // always provide mocked HiveWriter - writer = Mockito.mock(HiveWriter.class); - try { - Mockito.doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object[] arguments = invocation.getArguments(); - List<String> partitionVals = endPoint.partitionVals; - List<byte[]> writtenRecords = partitionValuesToWrittenRecords.get(partitionVals); - if (writtenRecords == null) { - writtenRecords = new ArrayList<>(); - partitionValuesToWrittenRecords.put(partitionVals, writtenRecords); - } - writtenRecords.add((byte[]) arguments[0]); - return null; - } - }).when(writer).write(any(byte[].class)); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - } - return writer; - } - - public Map<List<String>, List<byte[]>> getPartitionValuesToWrittenRecords() { - return partitionValuesToWrittenRecords; - } - - public List<byte[]> getRecordWritten(List<String> partitionValues) { - return partitionValuesToWrittenRecords.get(partitionValues); - } - } - - private static class FlushFailureHiveBolt extends TestingHiveBolt { - - public FlushFailureHiveBolt(HiveOptions options) { - super(options); - } - - @Override - void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, - InterruptedException { - if (rollToNext) { - throw new InterruptedException(); - } else { - super.flushAllWriters(false); - } - } - } - public TestHiveBolt() throws Exception { //metaStoreURI = "jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") +"metastore_db;create=true"; metaStoreURI = null; @@ -172,13 +98,13 @@ public class TestHiveBolt { @Test public void testWithByteArrayIdandMessage() - throws Exception { + throws Exception { DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() - .withColumnFields(new Fields(colNames)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(2) - .withBatchSize(2); + .withTxnsPerBatch(2) + .withBatchSize(2); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, collector); @@ -206,14 +132,13 @@ public class TestHiveBolt { bolt.cleanup(); } - @Test public void testWithoutPartitions() - throws Exception { + throws Exception { DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() - .withColumnFields(new Fields(colNames)); + .withColumnFields(new Fields(colNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName1, tblName1, mapper) - .withTxnsPerBatch(2).withBatchSize(2).withAutoCreatePartitions(false); + .withTxnsPerBatch(2).withBatchSize(2).withAutoCreatePartitions(false); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, collector); @@ -245,15 +170,15 @@ public class TestHiveBolt { @Test public void testWithTimeformat() - throws Exception { + throws Exception { String timeFormat = "yyyy/MM/dd"; DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() - .withColumnFields(new Fields(colNames)) - .withTimeAsPartitionField(timeFormat); + .withColumnFields(new Fields(colNames)) + .withTimeAsPartitionField(timeFormat); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName1, tblName1, mapper) - .withTxnsPerBatch(2) - .withBatchSize(1) - .withMaxOpenConnections(1); + .withTxnsPerBatch(2) + .withBatchSize(1) + .withMaxOpenConnections(1); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, collector); @@ -292,13 +217,13 @@ public class TestHiveBolt { @Test public void testData() - throws Exception { + throws Exception { DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() - .withColumnFields(new Fields(colNames)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(2) - .withBatchSize(1); + .withTxnsPerBatch(2) + .withBatchSize(1); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, new OutputCollector(collector)); @@ -327,15 +252,15 @@ public class TestHiveBolt { @Test public void testJsonWriter() - throws Exception { + throws Exception { // json record doesn't need columns to be in the same order // as table in hive. JsonRecordHiveMapper mapper = new JsonRecordHiveMapper() - .withColumnFields(new Fields(colNames1)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames1)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(2) - .withBatchSize(1); + .withTxnsPerBatch(2) + .withBatchSize(1); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, collector); @@ -373,11 +298,11 @@ public class TestHiveBolt { @Test public void testNoAcksUntilFlushed() { JsonRecordHiveMapper mapper = new JsonRecordHiveMapper() - .withColumnFields(new Fields(colNames1)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames1)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(2) - .withBatchSize(2); + .withTxnsPerBatch(2) + .withBatchSize(2); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, new OutputCollector(collector)); @@ -397,11 +322,11 @@ public class TestHiveBolt { @Test public void testNoAcksIfFlushFails() throws Exception { JsonRecordHiveMapper mapper = new JsonRecordHiveMapper() - .withColumnFields(new Fields(colNames1)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames1)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(2) - .withBatchSize(2); + .withTxnsPerBatch(2) + .withBatchSize(2); HiveBolt failingBolt = new FlushFailureHiveBolt(hiveOptions); @@ -422,11 +347,11 @@ public class TestHiveBolt { @Test public void testTickTuple() { JsonRecordHiveMapper mapper = new JsonRecordHiveMapper() - .withColumnFields(new Fields(colNames1)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames1)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(2) - .withBatchSize(2); + .withTxnsPerBatch(2) + .withBatchSize(2); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, new OutputCollector(collector)); @@ -452,11 +377,11 @@ public class TestHiveBolt { @Test public void testNoTickEmptyBatches() throws Exception { JsonRecordHiveMapper mapper = new JsonRecordHiveMapper() - .withColumnFields(new Fields(colNames1)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames1)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(2) - .withBatchSize(2); + .withTxnsPerBatch(2) + .withBatchSize(2); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, new OutputCollector(collector)); @@ -471,13 +396,13 @@ public class TestHiveBolt { @Test public void testMultiPartitionTuples() - throws Exception { + throws Exception { DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() - .withColumnFields(new Fields(colNames)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames)) + .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) - .withTxnsPerBatch(10) - .withBatchSize(10); + .withTxnsPerBatch(10) + .withBatchSize(10); bolt = new TestingHiveBolt(hiveOptions); bolt.prepare(config, null, new OutputCollector(collector)); @@ -517,7 +442,7 @@ public class TestHiveBolt { private Tuple generateTestTuple(Object id, Object msg, Object city, Object state) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + new Config(), new HashMap(), new HashMap(), new HashMap(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("id", "msg", "city", "state"); @@ -535,4 +460,67 @@ public class TestHiveBolt { return builder.toString().getBytes(); } + private static class TestingHiveBolt extends HiveBolt { + + protected Map<List<String>, List<byte[]>> partitionValuesToWrittenRecords = new HashMap<>(); + + public TestingHiveBolt(HiveOptions options) { + super(options); + } + + @Override + HiveWriter getOrCreateWriter(final HiveEndPoint endPoint) + throws HiveWriter.ConnectFailure, InterruptedException { + HiveWriter writer = allWriters.get(endPoint); + if (writer == null) { + // always provide mocked HiveWriter + writer = Mockito.mock(HiveWriter.class); + try { + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object[] arguments = invocation.getArguments(); + List<String> partitionVals = endPoint.partitionVals; + List<byte[]> writtenRecords = partitionValuesToWrittenRecords.get(partitionVals); + if (writtenRecords == null) { + writtenRecords = new ArrayList<>(); + partitionValuesToWrittenRecords.put(partitionVals, writtenRecords); + } + writtenRecords.add((byte[]) arguments[0]); + return null; + } + }).when(writer).write(any(byte[].class)); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } + return writer; + } + + public Map<List<String>, List<byte[]>> getPartitionValuesToWrittenRecords() { + return partitionValuesToWrittenRecords; + } + + public List<byte[]> getRecordWritten(List<String> partitionValues) { + return partitionValuesToWrittenRecords.get(partitionValues); + } + } + + private static class FlushFailureHiveBolt extends TestingHiveBolt { + + public FlushFailureHiveBolt(HiveOptions options) { + super(options); + } + + @Override + void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, + InterruptedException { + if (rollToNext) { + throw new InterruptedException(); + } else { + super.flushAllWriters(false); + } + } + } + }
http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java index 450342b..03d3383 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java @@ -1,140 +1,76 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hive.common; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import junit.framework.Assert; -import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.RecordWriter; +import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.StreamingConnection; import org.apache.hive.hcatalog.streaming.StreamingException; -import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.apache.storm.Config; +import org.apache.storm.hive.bolt.HiveSetupUtil; import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper; import org.apache.storm.hive.bolt.mapper.HiveMapper; -import org.apache.storm.hive.bolt.HiveSetupUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import org.apache.storm.Config; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; -import java.io.IOException; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.HashMap; - public class TestHiveWriter { - final static String dbName = "testdb"; - final static String tblName = "test_table2"; - public static final String PART1_NAME = "city"; public static final String PART2_NAME = "state"; public static final String[] partNames = { PART1_NAME, PART2_NAME }; - final String[] partitionVals = {"sunnyvale","ca"}; - final String[] colNames = {"id","msg"}; + final static String dbName = "testdb"; + final static String tblName = "test_table2"; + final String[] partitionVals = { "sunnyvale", "ca" }; + final String[] colNames = { "id", "msg" }; private final int port; private final String metaStoreURI; private final HiveConf conf; - private ExecutorService callTimeoutPool; - int timeout = 10000; // msec - UserGroupInformation ugi = null; - @Rule public TemporaryFolder dbFolder = new TemporaryFolder(); - - private static class TestingHiveWriter extends HiveWriter { - - private StreamingConnection mockedStreamingConn; - private TransactionBatch mockedTxBatch; - - public TestingHiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi, boolean tokenAuthEnabled) throws InterruptedException, ConnectFailure { - super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, mapper, ugi, tokenAuthEnabled); - } - - @Override - synchronized StreamingConnection newConnection(UserGroupInformation ugi, boolean tokenAuthEnabled) throws InterruptedException, ConnectFailure { - if (mockedStreamingConn == null) { - mockedStreamingConn = Mockito.mock(StreamingConnection.class); - mockedTxBatch = Mockito.mock(TransactionBatch.class); - - try { - Mockito.when(mockedStreamingConn.fetchTransactionBatch(Mockito.anyInt(), Mockito.any(RecordWriter.class))) - .thenReturn(mockedTxBatch); - } catch (StreamingException e) { - throw new RuntimeException(e); - } - } - - return mockedStreamingConn; - } - - public TransactionBatch getMockedTxBatch() { - return mockedTxBatch; - } - } - - private static class MockedDelemiteredRecordHiveMapper extends DelimitedRecordHiveMapper { - private final RecordWriter mockedRecordWriter; - - public MockedDelemiteredRecordHiveMapper() { - this.mockedRecordWriter = Mockito.mock(RecordWriter.class); - } - - @Override - public RecordWriter createRecordWriter(HiveEndPoint endPoint) throws StreamingException, IOException, ClassNotFoundException { - return mockedRecordWriter; - } - - public RecordWriter getMockedRecordWriter() { - return mockedRecordWriter; - } - } + int timeout = 10000; // msec + UserGroupInformation ugi = null; + private ExecutorService callTimeoutPool; public TestHiveWriter() throws Exception { port = 9083; metaStoreURI = null; int callTimeoutPoolSize = 1; callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize, - new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build()); + new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build()); // 1) Start metastore conf = HiveSetupUtil.getHiveConf(); TxnDbUtil.setConfValues(conf); - if(metaStoreURI!=null) { + if (metaStoreURI != null) { conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI); } } @@ -142,23 +78,23 @@ public class TestHiveWriter { @Test public void testInstantiate() throws Exception { DelimitedRecordHiveMapper mapper = new MockedDelemiteredRecordHiveMapper() - .withColumnFields(new Fields(colNames)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames)) + .withPartitionFields(new Fields(partNames)); HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals)); TestingHiveWriter writer = new TestingHiveWriter(endPoint, 10, true, timeout - ,callTimeoutPool, mapper, ugi, false); + , callTimeoutPool, mapper, ugi, false); writer.close(); } @Test public void testWriteBasic() throws Exception { DelimitedRecordHiveMapper mapper = new MockedDelemiteredRecordHiveMapper() - .withColumnFields(new Fields(colNames)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames)) + .withPartitionFields(new Fields(partNames)); HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals)); TestingHiveWriter writer = new TestingHiveWriter(endPoint, 10, true, timeout - , callTimeoutPool, mapper, ugi, false); - writeTuples(writer,mapper,3); + , callTimeoutPool, mapper, ugi, false); + writeTuples(writer, mapper, 3); writer.flush(false); writer.close(); Mockito.verify(writer.getMockedTxBatch(), Mockito.times(3)).write(Mockito.any(byte[].class)); @@ -167,15 +103,15 @@ public class TestHiveWriter { @Test public void testWriteMultiFlush() throws Exception { DelimitedRecordHiveMapper mapper = new MockedDelemiteredRecordHiveMapper() - .withColumnFields(new Fields(colNames)) - .withPartitionFields(new Fields(partNames)); + .withColumnFields(new Fields(colNames)) + .withPartitionFields(new Fields(partNames)); HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals)); TestingHiveWriter writer = new TestingHiveWriter(endPoint, 10, true, timeout - , callTimeoutPool, mapper, ugi, false); - Tuple tuple = generateTestTuple("1","abc"); + , callTimeoutPool, mapper, ugi, false); + Tuple tuple = generateTestTuple("1", "abc"); writer.write(mapper.mapRecord(tuple)); - tuple = generateTestTuple("2","def"); + tuple = generateTestTuple("2", "def"); writer.write(mapper.mapRecord(tuple)); Assert.assertEquals(writer.getTotalRecords(), 2); Mockito.verify(writer.getMockedTxBatch(), Mockito.times(2)).write(Mockito.any(byte[].class)); @@ -184,11 +120,11 @@ public class TestHiveWriter { Assert.assertEquals(writer.getTotalRecords(), 0); Mockito.verify(writer.getMockedTxBatch(), Mockito.atLeastOnce()).commit(); - tuple = generateTestTuple("3","ghi"); + tuple = generateTestTuple("3", "ghi"); writer.write(mapper.mapRecord(tuple)); writer.flush(true); - tuple = generateTestTuple("4","klm"); + tuple = generateTestTuple("4", "klm"); writer.write(mapper.mapRecord(tuple)); writer.flush(true); writer.close(); @@ -198,7 +134,7 @@ public class TestHiveWriter { private Tuple generateTestTuple(Object id, Object msg) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + new Config(), new HashMap(), new HashMap(), new HashMap(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("id", "msg"); @@ -208,13 +144,64 @@ public class TestHiveWriter { } private void writeTuples(HiveWriter writer, HiveMapper mapper, int count) - throws HiveWriter.WriteFailure, InterruptedException, SerializationError { + throws HiveWriter.WriteFailure, InterruptedException, SerializationError { Integer id = 100; String msg = "test-123"; for (int i = 1; i <= count; i++) { - Tuple tuple = generateTestTuple(id,msg); + Tuple tuple = generateTestTuple(id, msg); writer.write(mapper.mapRecord(tuple)); } } + private static class TestingHiveWriter extends HiveWriter { + + private StreamingConnection mockedStreamingConn; + private TransactionBatch mockedTxBatch; + + public TestingHiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, + ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi, + boolean tokenAuthEnabled) throws InterruptedException, ConnectFailure { + super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, mapper, ugi, tokenAuthEnabled); + } + + @Override + synchronized StreamingConnection newConnection(UserGroupInformation ugi, boolean tokenAuthEnabled) throws InterruptedException, + ConnectFailure { + if (mockedStreamingConn == null) { + mockedStreamingConn = Mockito.mock(StreamingConnection.class); + mockedTxBatch = Mockito.mock(TransactionBatch.class); + + try { + Mockito.when(mockedStreamingConn.fetchTransactionBatch(Mockito.anyInt(), Mockito.any(RecordWriter.class))) + .thenReturn(mockedTxBatch); + } catch (StreamingException e) { + throw new RuntimeException(e); + } + } + + return mockedStreamingConn; + } + + public TransactionBatch getMockedTxBatch() { + return mockedTxBatch; + } + } + + private static class MockedDelemiteredRecordHiveMapper extends DelimitedRecordHiveMapper { + private final RecordWriter mockedRecordWriter; + + public MockedDelemiteredRecordHiveMapper() { + this.mockedRecordWriter = Mockito.mock(RecordWriter.class); + } + + @Override + public RecordWriter createRecordWriter(HiveEndPoint endPoint) throws StreamingException, IOException, ClassNotFoundException { + return mockedRecordWriter; + } + + public RecordWriter getMockedRecordWriter() { + return mockedRecordWriter; + } + } + } \ No newline at end of file
