http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index 61b5304..f642607 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -34,7 +34,7 @@ import org.apache.hive.hcatalog.streaming.StreamingException;
 import org.apache.hive.hcatalog.streaming.TransactionBatch;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -46,6 +46,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -57,11 +58,13 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR;
+import static org.apache.nifi.processors.hive.PutHiveStreaming.REL_SUCCESS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
@@ -213,11 +216,33 @@ public class TestPutHiveStreaming {
     }
 
     @Test
-    public void onTriggerMultipleRecords() throws Exception {
+    public void onTriggerBadInputRollbackOnFailure() throws Exception {
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
         runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
-        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        runner.enqueue("I am not an Avro record".getBytes());
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+
+    @Test
+    public void onTriggerMultipleRecordsSingleTransaction() throws Exception {
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "100");
         runner.setValidateExpressionUsage(false);
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
@@ -237,13 +262,191 @@ public class TestPutHiveStreaming {
                 put("favorite_number", 3);
             }
         };
-        runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3)));
+        final List<Map<String, Object>> users = Arrays.asList(user1, user2, 
user3);
+        runner.enqueue(createAvroRecord(users));
         runner.run();
 
         runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
         MockFlowFile resultFlowFile = 
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
-        assertNotNull(resultFlowFile);
-        assertEquals("3", 
resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertOutputAvroRecords(users, resultFlowFile);
+    }
+
+    @Test
+    public void onTriggerMultipleRecordsMultipleTransaction() throws Exception 
{
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        Map<String, Object> user2 = new HashMap<String, Object>() {
+            {
+                put("name", "Mary");
+                put("favorite_number", 42);
+            }
+        };
+        Map<String, Object> user3 = new HashMap<String, Object>() {
+            {
+                put("name", "Matt");
+                put("favorite_number", 3);
+            }
+        };
+        final List<Map<String, Object>> users = Arrays.asList(user1, user2, 
user3);
+        runner.enqueue(createAvroRecord(users));
+        runner.run();
+
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+        MockFlowFile resultFlowFile = 
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
+        assertOutputAvroRecords(users, resultFlowFile);
+    }
+
+    @Test
+    public void onTriggerMultipleRecordsFailInMiddle() throws Exception {
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+        runner.setValidateExpressionUsage(false);
+        processor.setGenerateWriteFailure(true, 1);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        Map<String, Object> user2 = new HashMap<String, Object>() {
+            {
+                put("name", "Mary");
+                put("favorite_number", 42);
+            }
+        };
+        Map<String, Object> user3 = new HashMap<String, Object>() {
+            {
+                put("name", "Matt");
+                put("favorite_number", 3);
+            }
+        };
+        Map<String, Object> user4 = new HashMap<String, Object>() {
+            {
+                put("name", "Mike");
+                put("favorite_number", 345);
+            }
+        };
+        final List<Map<String, Object>> users = Arrays.asList(user1, user2, 
user3, user4);
+        runner.enqueue(createAvroRecord(users));
+        runner.run();
+
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+
+        MockFlowFile resultFlowFile = 
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
+        assertOutputAvroRecords(Arrays.asList(user1, user3, user4), 
resultFlowFile);
+
+        final MockFlowFile failedFlowFile = 
runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0);
+        assertOutputAvroRecords(Arrays.asList(user2), failedFlowFile);
+    }
+
+    @Test
+    public void onTriggerMultipleRecordsFailInMiddleRollbackOnFailure() throws 
Exception {
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        processor.setGenerateWriteFailure(true, 1);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        Map<String, Object> user2 = new HashMap<String, Object>() {
+            {
+                put("name", "Mary");
+                put("favorite_number", 42);
+            }
+        };
+        Map<String, Object> user3 = new HashMap<String, Object>() {
+            {
+                put("name", "Matt");
+                put("favorite_number", 3);
+            }
+        };
+        runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown, because any Hive 
Transaction is committed yet.");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void 
onTriggerMultipleRecordsFailInMiddleRollbackOnFailureCommitted() throws 
Exception {
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        // The first two records are committed, then an issue will happen at 
the 3rd record.
+        processor.setGenerateWriteFailure(true, 2);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        Map<String, Object> user2 = new HashMap<String, Object>() {
+            {
+                put("name", "Mary");
+                put("favorite_number", 42);
+            }
+        };
+        Map<String, Object> user3 = new HashMap<String, Object>() {
+            {
+                put("name", "Matt");
+                put("favorite_number", 3);
+            }
+        };
+        Map<String, Object> user4 = new HashMap<String, Object>() {
+            {
+                put("name", "Mike");
+                put("favorite_number", 345);
+            }
+        };
+        runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3, 
user4)));
+        // ProcessException should NOT be thrown, because a Hive Transaction 
is already committed.
+        runner.run();
+
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+
+        // Assert transferred FlowFile.
+        assertOutputAvroRecords(Arrays.asList(user1, user2), 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0));
+
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+
+    }
+
+    private void assertOutputAvroRecords(List<Map<String, Object>> 
expectedRecords, MockFlowFile resultFlowFile) throws IOException {
+        assertEquals(String.valueOf(expectedRecords.size()), 
resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
+
         final DataFileStream<GenericRecord> reader = new DataFileStream<>(
                 new ByteArrayInputStream(resultFlowFile.toByteArray()),
                 new GenericDatumReader<GenericRecord>());
@@ -253,17 +456,20 @@ public class TestPutHiveStreaming {
         // Verify that the schema is preserved
         assertTrue(schema.equals(new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"))));
 
-        // Verify the records are intact. We can't guarantee order so check 
the total number and non-null fields
-        assertTrue(reader.hasNext());
-        GenericRecord record = reader.next(null);
-        assertNotNull(record.get("name"));
-        assertNotNull(record.get("favorite_number"));
-        assertNull(record.get("favorite_color"));
-        assertNull(record.get("scale"));
-        assertTrue(reader.hasNext());
-        record = reader.next(record);
-        assertTrue(reader.hasNext());
-        reader.next(record);
+        GenericRecord record = null;
+        for (Map<String, Object> expectedRecord : expectedRecords) {
+            assertTrue(reader.hasNext());
+            record = reader.next(record);
+            final String name = record.get("name").toString();
+            final Integer favorite_number = (Integer) 
record.get("favorite_number");
+            assertNotNull(name);
+            assertNotNull(favorite_number);
+            assertNull(record.get("favorite_color"));
+            assertNull(record.get("scale"));
+
+            assertEquals(expectedRecord.get("name"), name);
+            assertEquals(expectedRecord.get("favorite_number"), 
favorite_number);
+        }
         assertFalse(reader.hasNext());
     }
 
@@ -319,6 +525,39 @@ public class TestPutHiveStreaming {
     }
 
     @Test
+    public void onTriggerWithPartitionColumnsNotInRecordRollbackOnFailure() 
throws Exception {
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, 
"favorite_food");
+        runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+                put("favorite_color", "blue");
+            }
+        };
+
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void onTriggerWithRetireWriters() throws Exception {
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
         runner.setProperty(PutHiveStreaming.DB_NAME, "default");
@@ -390,6 +629,36 @@ public class TestPutHiveStreaming {
     }
 
     @Test
+    public void onTriggerWithConnectFailureRollbackOnFailure() throws 
Exception {
+        processor.setGenerateConnectFailure(true);
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void onTriggerWithInterruptedException() throws Exception {
         processor.setGenerateInterruptedExceptionOnCreateWriter(true);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
@@ -410,6 +679,32 @@ public class TestPutHiveStreaming {
     }
 
     @Test
+    public void onTriggerWithInterruptedExceptionRollbackOnFailure() throws 
Exception {
+        processor.setGenerateInterruptedExceptionOnCreateWriter(true);
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+    }
+
+    @Test
     public void onTriggerWithWriteFailure() throws Exception {
         processor.setGenerateWriteFailure(true);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
@@ -437,6 +732,40 @@ public class TestPutHiveStreaming {
     }
 
     @Test
+    public void onTriggerWithWriteFailureRollbackOnFailure() throws Exception {
+        processor.setGenerateWriteFailure(true);
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        Map<String, Object> user2 = new HashMap<String, Object>() {
+            {
+                put("name", "Mary");
+                put("favorite_number", 42);
+            }
+        };
+        runner.enqueue(createAvroRecord(Arrays.asList(user1, user2)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void onTriggerWithSerializationError() throws Exception {
         processor.setGenerateSerializationError(true);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
@@ -458,6 +787,35 @@ public class TestPutHiveStreaming {
     }
 
     @Test
+    public void onTriggerWithSerializationErrorRollbackOnFailure() throws 
Exception {
+        processor.setGenerateSerializationError(true);
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void onTriggerWithCommitFailure() throws Exception {
         processor.setGenerateCommitFailure(true);
         runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
@@ -474,9 +832,39 @@ public class TestPutHiveStreaming {
         runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
         runner.run();
 
-        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
+    }
+
+    @Test
+    public void onTriggerWithCommitFailureRollbackOnFailure() throws Exception 
{
+        processor.setGenerateCommitFailure(true);
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
         runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
         runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
     }
 
     @Test
@@ -496,9 +884,39 @@ public class TestPutHiveStreaming {
         runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
         runner.run();
 
-        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
+    }
+
+    @Test
+    public void onTriggerWithTransactionFailureRollbackOnFailure() throws 
Exception {
+        processor.setGenerateTransactionFailure(true);
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+        runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+        runner.setValidateExpressionUsage(false);
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
         runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
         runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
     }
 
     @Test
@@ -563,10 +981,12 @@ public class TestPutHiveStreaming {
         private boolean generateConnectFailure = false;
         private boolean generateInterruptedExceptionOnCreateWriter = false;
         private boolean generateWriteFailure = false;
+        private Integer generateWriteFailureRecordIndex;
         private boolean generateSerializationError = false;
         private boolean generateCommitFailure = false;
         private boolean generateTransactionFailure = false;
         private boolean generateExceptionOnFlushAndClose = false;
+        private HiveEndPoint hiveEndPoint = mock(HiveEndPoint.class);
 
         @Override
         public KerberosProperties getKerberosProperties() {
@@ -579,7 +999,6 @@ public class TestPutHiveStreaming {
 
         @Override
         public HiveEndPoint makeHiveEndPoint(List<String> partitionValues, 
HiveOptions hiveOptions) {
-            HiveEndPoint hiveEndPoint = mock(HiveEndPoint.class);
             return hiveEndPoint;
         }
 
@@ -593,7 +1012,7 @@ public class TestPutHiveStreaming {
                 throw new InterruptedException();
             }
             MockHiveWriter hiveWriter = new MockHiveWriter(endPoint, 
options.getTxnsPerBatch(), options.getAutoCreatePartitions(), 
options.getCallTimeOut(), callTimeoutPool, ugi, hiveConfig);
-            hiveWriter.setGenerateWriteFailure(generateWriteFailure);
+            hiveWriter.setGenerateWriteFailure(generateWriteFailure, 
generateWriteFailureRecordIndex);
             
hiveWriter.setGenerateSerializationError(generateSerializationError);
             hiveWriter.setGenerateCommitFailure(generateCommitFailure);
             
hiveWriter.setGenerateTransactionFailure(generateTransactionFailure);
@@ -613,6 +1032,11 @@ public class TestPutHiveStreaming {
             this.generateWriteFailure = generateWriteFailure;
         }
 
+        public void setGenerateWriteFailure(boolean generateWriteFailure, int 
generateWriteFailureRecordIndex) {
+            this.generateWriteFailure = generateWriteFailure;
+            this.generateWriteFailureRecordIndex = 
generateWriteFailureRecordIndex;
+        }
+
         public void setGenerateSerializationError(boolean 
generateSerializationError) {
             this.generateSerializationError = generateSerializationError;
         }
@@ -634,10 +1058,13 @@ public class TestPutHiveStreaming {
     private class MockHiveWriter extends HiveWriter {
 
         private boolean generateWriteFailure = false;
+        private Integer generateWriteFailureRecordIndex;
         private boolean generateSerializationError = false;
         private boolean generateCommitFailure = false;
         private boolean generateTransactionFailure = false;
         private boolean generateExceptionOnFlushAndClose = false;
+        private int writeAttemptCount = 0;
+        private int totalRecords = 0;
 
         private HiveEndPoint endPoint;
 
@@ -651,16 +1078,23 @@ public class TestPutHiveStreaming {
 
         @Override
         public synchronized void write(byte[] record) throws WriteFailure, 
SerializationError, InterruptedException {
-            if (generateWriteFailure) {
-                throw new HiveWriter.WriteFailure(endPoint, 1L, new 
Exception());
-            }
-            if (generateSerializationError) {
-                throw new SerializationError("Test Serialization Error", new 
Exception());
+            try {
+                if (generateWriteFailure
+                        && (generateWriteFailureRecordIndex == null || 
writeAttemptCount == generateWriteFailureRecordIndex)) {
+                    throw new WriteFailure(endPoint, 1L, new Exception());
+                }
+                if (generateSerializationError) {
+                    throw new SerializationError("Test Serialization Error", 
new Exception());
+                }
+                totalRecords++;
+            } finally {
+                writeAttemptCount++;
             }
         }
 
-        public void setGenerateWriteFailure(boolean generateWriteFailure) {
+        public void setGenerateWriteFailure(boolean generateWriteFailure, 
Integer generateWriteFailureRecordIndex) {
             this.generateWriteFailure = generateWriteFailure;
+            this.generateWriteFailureRecordIndex = 
generateWriteFailureRecordIndex;
         }
 
         public void setGenerateSerializationError(boolean 
generateSerializationError) {
@@ -754,6 +1188,11 @@ public class TestPutHiveStreaming {
         protected void nextTxn(boolean rollToNext) throws StreamingException, 
InterruptedException, TxnBatchFailure {
             // Empty
         }
+
+        @Override
+        public int getTotalRecords() {
+            return totalRecords;
+        }
     }
 
 }

Reply via email to