http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index 61b5304..f642607 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -34,7 +34,7 @@ import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -46,6 +46,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@@ -57,11 +58,13 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import static
org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR;
+import static org.apache.nifi.processors.hive.PutHiveStreaming.REL_SUCCESS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
@@ -213,11 +216,33 @@ public class TestPutHiveStreaming {
}
@Test
- public void onTriggerMultipleRecords() throws Exception {
+ public void onTriggerBadInputRollbackOnFailure() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
- runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue("I am not an Avro record".getBytes());
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+
+ @Test
+ public void onTriggerMultipleRecordsSingleTransaction() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
@@ -237,13 +262,191 @@ public class TestPutHiveStreaming {
put("favorite_number", 3);
}
};
- runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3)));
+ final List<Map<String, Object>> users = Arrays.asList(user1, user2,
user3);
+ runner.enqueue(createAvroRecord(users));
runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
MockFlowFile resultFlowFile =
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
- assertNotNull(resultFlowFile);
- assertEquals("3",
resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
+ assertOutputAvroRecords(users, resultFlowFile);
+ }
+
+ @Test
+ public void onTriggerMultipleRecordsMultipleTransaction() throws Exception
{
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ final List<Map<String, Object>> users = Arrays.asList(user1, user2,
user3);
+ runner.enqueue(createAvroRecord(users));
+ runner.run();
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+ MockFlowFile resultFlowFile =
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
+ assertOutputAvroRecords(users, resultFlowFile);
+ }
+
+ @Test
+ public void onTriggerMultipleRecordsFailInMiddle() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setValidateExpressionUsage(false);
+ processor.setGenerateWriteFailure(true, 1);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ Map<String, Object> user4 = new HashMap<String, Object>() {
+ {
+ put("name", "Mike");
+ put("favorite_number", 345);
+ }
+ };
+ final List<Map<String, Object>> users = Arrays.asList(user1, user2,
user3, user4);
+ runner.enqueue(createAvroRecord(users));
+ runner.run();
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+
+ MockFlowFile resultFlowFile =
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
+ assertOutputAvroRecords(Arrays.asList(user1, user3, user4),
resultFlowFile);
+
+ final MockFlowFile failedFlowFile =
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0);
+ assertOutputAvroRecords(Arrays.asList(user2), failedFlowFile);
+ }
+
+ @Test
+ public void onTriggerMultipleRecordsFailInMiddleRollbackOnFailure() throws
Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ processor.setGenerateWriteFailure(true, 1);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown, because any Hive
Transaction is committed yet.");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
+ public void
onTriggerMultipleRecordsFailInMiddleRollbackOnFailureCommitted() throws
Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ // The first two records are committed, then an issue will happen at
the 3rd record.
+ processor.setGenerateWriteFailure(true, 2);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ Map<String, Object> user4 = new HashMap<String, Object>() {
+ {
+ put("name", "Mike");
+ put("favorite_number", 345);
+ }
+ };
+ runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3,
user4)));
+ // ProcessException should NOT be thrown, because a Hive Transaction
is already committed.
+ runner.run();
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+
+ // Assert transferred FlowFile.
+ assertOutputAvroRecords(Arrays.asList(user1, user2),
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0));
+
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+
+ }
+
+ private void assertOutputAvroRecords(List<Map<String, Object>>
expectedRecords, MockFlowFile resultFlowFile) throws IOException {
+ assertEquals(String.valueOf(expectedRecords.size()),
resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
+
final DataFileStream<GenericRecord> reader = new DataFileStream<>(
new ByteArrayInputStream(resultFlowFile.toByteArray()),
new GenericDatumReader<GenericRecord>());
@@ -253,17 +456,20 @@ public class TestPutHiveStreaming {
// Verify that the schema is preserved
assertTrue(schema.equals(new Schema.Parser().parse(new
File("src/test/resources/user.avsc"))));
- // Verify the records are intact. We can't guarantee order so check
the total number and non-null fields
- assertTrue(reader.hasNext());
- GenericRecord record = reader.next(null);
- assertNotNull(record.get("name"));
- assertNotNull(record.get("favorite_number"));
- assertNull(record.get("favorite_color"));
- assertNull(record.get("scale"));
- assertTrue(reader.hasNext());
- record = reader.next(record);
- assertTrue(reader.hasNext());
- reader.next(record);
+ GenericRecord record = null;
+ for (Map<String, Object> expectedRecord : expectedRecords) {
+ assertTrue(reader.hasNext());
+ record = reader.next(record);
+ final String name = record.get("name").toString();
+ final Integer favorite_number = (Integer)
record.get("favorite_number");
+ assertNotNull(name);
+ assertNotNull(favorite_number);
+ assertNull(record.get("favorite_color"));
+ assertNull(record.get("scale"));
+
+ assertEquals(expectedRecord.get("name"), name);
+ assertEquals(expectedRecord.get("favorite_number"),
favorite_number);
+ }
assertFalse(reader.hasNext());
}
@@ -319,6 +525,39 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithPartitionColumnsNotInRecordRollbackOnFailure()
throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS,
"favorite_food");
+ runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ put("favorite_color", "blue");
+ }
+ };
+
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithRetireWriters() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
@@ -390,6 +629,36 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithConnectFailureRollbackOnFailure() throws
Exception {
+ processor.setGenerateConnectFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithInterruptedException() throws Exception {
processor.setGenerateInterruptedExceptionOnCreateWriter(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
@@ -410,6 +679,32 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithInterruptedExceptionRollbackOnFailure() throws
Exception {
+ processor.setGenerateInterruptedExceptionOnCreateWriter(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ }
+
+ @Test
public void onTriggerWithWriteFailure() throws Exception {
processor.setGenerateWriteFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
@@ -437,6 +732,40 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithWriteFailureRollbackOnFailure() throws Exception {
+ processor.setGenerateWriteFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ runner.enqueue(createAvroRecord(Arrays.asList(user1, user2)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithSerializationError() throws Exception {
processor.setGenerateSerializationError(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
@@ -458,6 +787,35 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithSerializationErrorRollbackOnFailure() throws
Exception {
+ processor.setGenerateSerializationError(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithCommitFailure() throws Exception {
processor.setGenerateCommitFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
@@ -474,9 +832,39 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
- runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
+ }
+
+ @Test
+ public void onTriggerWithCommitFailureRollbackOnFailure() throws Exception
{
+ processor.setGenerateCommitFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
}
@Test
@@ -496,9 +884,39 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
- runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
+ }
+
+ @Test
+ public void onTriggerWithTransactionFailureRollbackOnFailure() throws
Exception {
+ processor.setGenerateTransactionFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
}
@Test
@@ -563,10 +981,12 @@ public class TestPutHiveStreaming {
private boolean generateConnectFailure = false;
private boolean generateInterruptedExceptionOnCreateWriter = false;
private boolean generateWriteFailure = false;
+ private Integer generateWriteFailureRecordIndex;
private boolean generateSerializationError = false;
private boolean generateCommitFailure = false;
private boolean generateTransactionFailure = false;
private boolean generateExceptionOnFlushAndClose = false;
+ private HiveEndPoint hiveEndPoint = mock(HiveEndPoint.class);
@Override
public KerberosProperties getKerberosProperties() {
@@ -579,7 +999,6 @@ public class TestPutHiveStreaming {
@Override
public HiveEndPoint makeHiveEndPoint(List<String> partitionValues,
HiveOptions hiveOptions) {
- HiveEndPoint hiveEndPoint = mock(HiveEndPoint.class);
return hiveEndPoint;
}
@@ -593,7 +1012,7 @@ public class TestPutHiveStreaming {
throw new InterruptedException();
}
MockHiveWriter hiveWriter = new MockHiveWriter(endPoint,
options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
options.getCallTimeOut(), callTimeoutPool, ugi, hiveConfig);
- hiveWriter.setGenerateWriteFailure(generateWriteFailure);
+ hiveWriter.setGenerateWriteFailure(generateWriteFailure,
generateWriteFailureRecordIndex);
hiveWriter.setGenerateSerializationError(generateSerializationError);
hiveWriter.setGenerateCommitFailure(generateCommitFailure);
hiveWriter.setGenerateTransactionFailure(generateTransactionFailure);
@@ -613,6 +1032,11 @@ public class TestPutHiveStreaming {
this.generateWriteFailure = generateWriteFailure;
}
+ public void setGenerateWriteFailure(boolean generateWriteFailure, int
generateWriteFailureRecordIndex) {
+ this.generateWriteFailure = generateWriteFailure;
+ this.generateWriteFailureRecordIndex =
generateWriteFailureRecordIndex;
+ }
+
public void setGenerateSerializationError(boolean
generateSerializationError) {
this.generateSerializationError = generateSerializationError;
}
@@ -634,10 +1058,13 @@ public class TestPutHiveStreaming {
private class MockHiveWriter extends HiveWriter {
private boolean generateWriteFailure = false;
+ private Integer generateWriteFailureRecordIndex;
private boolean generateSerializationError = false;
private boolean generateCommitFailure = false;
private boolean generateTransactionFailure = false;
private boolean generateExceptionOnFlushAndClose = false;
+ private int writeAttemptCount = 0;
+ private int totalRecords = 0;
private HiveEndPoint endPoint;
@@ -651,16 +1078,23 @@ public class TestPutHiveStreaming {
@Override
public synchronized void write(byte[] record) throws WriteFailure,
SerializationError, InterruptedException {
- if (generateWriteFailure) {
- throw new HiveWriter.WriteFailure(endPoint, 1L, new
Exception());
- }
- if (generateSerializationError) {
- throw new SerializationError("Test Serialization Error", new
Exception());
+ try {
+ if (generateWriteFailure
+ && (generateWriteFailureRecordIndex == null ||
writeAttemptCount == generateWriteFailureRecordIndex)) {
+ throw new WriteFailure(endPoint, 1L, new Exception());
+ }
+ if (generateSerializationError) {
+ throw new SerializationError("Test Serialization Error",
new Exception());
+ }
+ totalRecords++;
+ } finally {
+ writeAttemptCount++;
}
}
- public void setGenerateWriteFailure(boolean generateWriteFailure) {
+ public void setGenerateWriteFailure(boolean generateWriteFailure,
Integer generateWriteFailureRecordIndex) {
this.generateWriteFailure = generateWriteFailure;
+ this.generateWriteFailureRecordIndex =
generateWriteFailureRecordIndex;
}
public void setGenerateSerializationError(boolean
generateSerializationError) {
@@ -754,6 +1188,11 @@ public class TestPutHiveStreaming {
protected void nextTxn(boolean rollToNext) throws StreamingException,
InterruptedException, TxnBatchFailure {
// Empty
}
+
+ @Override
+ public int getTotalRecords() {
+ return totalRecords;
+ }
}
}