http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
new file mode 100644
index 0000000..6a65783
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.streaming.ConnectionStats;
+import org.apache.hive.streaming.HiveRecordWriter;
+import org.apache.hive.streaming.PartitionInfo;
+import org.apache.hive.streaming.RecordWriter;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.StubConnectionError;
+import org.apache.hive.streaming.StubSerializationError;
+import org.apache.hive.streaming.StubStreamingIOFailure;
+import org.apache.hive.streaming.StubTransactionError;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
+import static 
org.apache.nifi.processors.hive.PutHive3Streaming.HIVE_STREAMING_RECORD_COUNT_ATTR;
+import static 
org.apache.nifi.processors.hive.PutHive3Streaming.KERBEROS_CREDENTIALS_SERVICE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for PutHive3Streaming processor.
+ */
+public class TestPutHive3Streaming {
+
+    private static final String TEST_CONF_PATH = 
"src/test/resources/core-site.xml";
+    private static final String TARGET_HIVE = "target/hive";
+
+    private TestRunner runner;
+    private MockPutHive3Streaming processor;
+
+    private HiveConfigurator hiveConfigurator;
+    private HiveConf hiveConf;
+    private UserGroupInformation ugi;
+    private Schema schema;
+
+    @Before
+    public void setUp() throws Exception {
+
+        final String avroSchema = IOUtils.toString(new 
FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+
+        Configuration testConf = new Configuration();
+        testConf.addResource(new Path(TEST_CONF_PATH));
+
+        // needed for calls to UserGroupInformation.setConfiguration() to work 
when passing in
+        // config with Kerberos authentication enabled
+        System.setProperty("java.security.krb5.realm", "nifi.com");
+        System.setProperty("java.security.krb5.kdc", "nifi.kdc");
+
+        ugi = null;
+        processor = new MockPutHive3Streaming();
+        hiveConfigurator = mock(HiveConfigurator.class);
+        hiveConf = new HiveConf();
+        
when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf);
+        processor.hiveConfigurator = hiveConfigurator;
+
+        // Delete any temp files from previous tests
+        try {
+            FileUtils.deleteDirectory(new File(TARGET_HIVE));
+        } catch (IOException ioe) {
+            // Do nothing, directory may not have existed
+        }
+    }
+
+    private void configure(final PutHive3Streaming processor, final int 
numUsers) throws InitializationException {
+        configure(processor, numUsers, -1);
+    }
+
+    private void configure(final PutHive3Streaming processor, final int 
numUsers, int failAfter) throws InitializationException {
+        configure(processor, numUsers, failAfter, null);
+    }
+
+    private void configure(final PutHive3Streaming processor, final int 
numUsers, final int failAfter,
+                           final BiFunction<Integer, MockRecordParser, Void> 
recordGenerator) throws InitializationException {
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, 
TEST_CONF_PATH);
+        MockRecordParser readerFactory = new MockRecordParser();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), 
recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        if (recordGenerator == null) {
+            for (int i = 0; i < numUsers; i++) {
+                readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0);
+            }
+        } else {
+            recordGenerator.apply(numUsers, readerFactory);
+        }
+
+        readerFactory.failAfter(failAfter);
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+
+        runner.setProperty(PutHive3Streaming.RECORD_READER, 
"mock-reader-factory");
+    }
+
+    private void configureComplex(final MockPutHive3Streaming processor, final 
int numUsers, final int failAfter,
+                                  final BiFunction<Integer, MockRecordParser, 
Void> recordGenerator) throws IOException, InitializationException {
+        final String avroSchema = IOUtils.toString(new 
FileInputStream("src/test/resources/array_of_records.avsc"), 
StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+        processor.setFields(Arrays.asList(new FieldSchema("records",
+                serdeConstants.LIST_TYPE_NAME + "<"
+                        + serdeConstants.MAP_TYPE_NAME + "<"
+                        + serdeConstants.STRING_TYPE_NAME + ","
+                        +  serdeConstants.STRING_TYPE_NAME + ">>", "")));
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, 
TEST_CONF_PATH);
+        MockRecordParser readerFactory = new MockRecordParser();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), 
recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        if (recordGenerator == null) {
+            Object[] mapArray = new Object[numUsers];
+            for (int i = 0; i < numUsers; i++) {
+                final int x = i;
+                Map<String, Object> map = new HashMap<String, Object>() {{
+                    put("name", "name" + x);
+                    put("age", x * 5);
+                }};
+                mapArray[i] = map;
+            }
+            readerFactory.addRecord((Object)mapArray);
+        } else {
+            recordGenerator.apply(numUsers, readerFactory);
+        }
+
+        readerFactory.failAfter(failAfter);
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+
+        runner.setProperty(PutHive3Streaming.RECORD_READER, 
"mock-reader-factory");
+    }
+
+    @Test
+    public void testSetup() throws Exception {
+        configure(processor, 0);
+        runner.assertNotValid();
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.assertNotValid();
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.assertValid();
+        runner.run();
+    }
+
+    @Test
+    public void testUgiGetsCleared() throws Exception {
+        configure(processor, 0);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        processor.ugi = mock(UserGroupInformation.class);
+        runner.run();
+        assertNull(processor.ugi);
+    }
+
+    @Test
+    public void testUgiGetsSetIfSecure() throws Exception {
+        configure(processor, 1);
+        hiveConf.set(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION, 
SecurityUtil.KERBEROS);
+        KerberosCredentialsService kcs = new MockKerberosCredentialsService();
+        runner.addControllerService("kcs", kcs);
+        runner.setProperty(KERBEROS_CREDENTIALS_SERVICE, "kcs");
+        runner.enableControllerService(kcs);
+        ugi = mock(UserGroupInformation.class);
+        when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), 
anyString())).thenReturn(ugi);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        runner.run();
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testSetupWithKerberosAuthFailed() throws Exception {
+        configure(processor, 0);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, 
"src/test/resources/core-site-security.xml, 
src/test/resources/hive-site-security.xml");
+
+        hiveConf.set(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION, 
SecurityUtil.KERBEROS);
+        KerberosCredentialsService kcs = new 
MockKerberosCredentialsService(null, null);
+        runner.addControllerService("kcs", kcs);
+        runner.setProperty(KERBEROS_CREDENTIALS_SERVICE, "kcs");
+        runner.enableControllerService(kcs);
+        runner.assertNotValid();
+        runner.run();
+    }
+
+    @Test
+    public void onTrigger() throws Exception {
+        configure(processor, 1);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("1", 
flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.users", 
flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
+    public void onTriggerComplex() throws Exception {
+        configureComplex(processor, 10, -1, null);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        // Schema is an array of size 10, so only one record is output
+        assertEquals("1", 
flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.users", 
flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
+    public void onTriggerBadInput() throws Exception {
+        configure(processor, 1, 0);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue("I am not an Avro record".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void onTriggerBadInputRollbackOnFailure() throws Exception {
+        configure(processor, 1, 0);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+
+        runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true");
+        runner.enqueue("I am not an Avro record".getBytes());
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+
+    @Test
+    public void onTriggerMultipleRecordsSingleTransaction() throws Exception {
+        configure(processor, 3);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        Map<String, Object> user2 = new HashMap<String, Object>() {
+            {
+                put("name", "Mary");
+                put("favorite_number", 42);
+            }
+        };
+        Map<String, Object> user3 = new HashMap<String, Object>() {
+            {
+                put("name", "Matt");
+                put("favorite_number", 3);
+            }
+        };
+        final List<Map<String, Object>> users = Arrays.asList(user1, user2, 
user3);
+        runner.enqueue(createAvroRecord(users));
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        MockFlowFile resultFlowFile = 
runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertOutputAvroRecords(users, resultFlowFile);
+    }
+
+    @Test
+    public void onTriggerMultipleRecordsFailInMiddle() throws Exception {
+        configure(processor, 4);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        processor.setGenerateWriteFailure(true);
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
+        runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0);
+    }
+
+    @Test
+    public void onTriggerMultipleRecordsFailInMiddleRollbackOnFailure() throws 
Exception {
+        configure(processor, 3);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true");
+        processor.setGenerateWriteFailure(true);
+        runner.enqueue(new byte[0]);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown, because any Hive 
Transaction is committed yet.");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    private void assertOutputAvroRecords(List<Map<String, Object>> 
expectedRecords, MockFlowFile resultFlowFile) throws IOException {
+        assertEquals(String.valueOf(expectedRecords.size()), 
resultFlowFile.getAttribute(PutHive3Streaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
+
+        final DataFileStream<GenericRecord> reader = new DataFileStream<>(
+                new ByteArrayInputStream(resultFlowFile.toByteArray()),
+                new GenericDatumReader<>());
+
+        Schema schema = reader.getSchema();
+
+        // Verify that the schema is preserved
+        assertEquals(schema, new Schema.Parser().parse(new 
File("src/test/resources/user.avsc")));
+
+        GenericRecord record = null;
+        for (Map<String, Object> expectedRecord : expectedRecords) {
+            assertTrue(reader.hasNext());
+            record = reader.next(record);
+            final String name = record.get("name").toString();
+            final Integer favorite_number = (Integer) 
record.get("favorite_number");
+            assertNotNull(name);
+            assertNotNull(favorite_number);
+            assertNull(record.get("favorite_color"));
+            assertNull(record.get("scale"));
+
+            assertEquals(expectedRecord.get("name"), name);
+            assertEquals(expectedRecord.get("favorite_number"), 
favorite_number);
+        }
+        assertFalse(reader.hasNext());
+    }
+
+    @Test
+    public void onTriggerWithConnectFailure() throws Exception {
+        configure(processor, 1);
+        processor.setGenerateConnectFailure(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+        runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void onTriggerWithConnectFailureRollbackOnFailure() throws 
Exception {
+        configure(processor, 1);
+        processor.setGenerateConnectFailure(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true");
+        runner.enqueue(new byte[0]);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void onTriggerWithWriteFailure() throws Exception {
+        configure(processor, 2);
+        processor.setGenerateWriteFailure(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutHive3Streaming.REL_FAILURE).get(0);
+        assertEquals("0", 
flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.users", 
flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
+    public void onTriggerWithWriteFailureRollbackOnFailure() throws Exception {
+        configure(processor, 2);
+        processor.setGenerateWriteFailure(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true");
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        Map<String, Object> user2 = new HashMap<String, Object>() {
+            {
+                put("name", "Mary");
+                put("favorite_number", 42);
+            }
+        };
+        runner.enqueue(createAvroRecord(Arrays.asList(user1, user2)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void onTriggerWithSerializationError() throws Exception {
+        configure(processor, 1);
+        processor.setGenerateSerializationError(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void onTriggerWithSerializationErrorRollbackOnFailure() throws 
Exception {
+        configure(processor, 1);
+        processor.setGenerateSerializationError(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true");
+        Map<String, Object> user1 = new HashMap<String, Object>() {
+            {
+                put("name", "Joe");
+                put("favorite_number", 146);
+            }
+        };
+        runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void onTriggerWithCommitFailure() throws Exception {
+        configure(processor, 1);
+        processor.setGenerateCommitFailure(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "false");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 1);
+    }
+
+    @Test
+    public void onTriggerWithCommitFailureRollbackOnFailure() throws Exception 
{
+        configure(processor, 1);
+        processor.setGenerateCommitFailure(true);
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true");
+        runner.enqueue(new byte[0]);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0);
+        // Assert incoming FlowFile stays in input queue.
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void cleanup() {
+        processor.cleanup();
+    }
+
+    private byte[] createAvroRecord(List<Map<String, Object>> records) throws 
IOException {
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+
+        List<GenericRecord> users = new LinkedList<>();
+        for (Map<String, Object> record : records) {
+            final GenericRecord user = new GenericData.Record(schema);
+            user.put("name", record.get("name"));
+            user.put("favorite_number", record.get("favorite_number"));
+            user.put("favorite_color", record.get("favorite_color"));
+            users.add(user);
+        }
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try (DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, out);
+            for (final GenericRecord user : users) {
+                dataFileWriter.append(user);
+            }
+        }
+        return out.toByteArray();
+
+    }
+
+    private class MockPutHive3Streaming extends PutHive3Streaming {
+
+        private boolean generateConnectFailure = false;
+        private boolean generateWriteFailure = false;
+        private boolean generateSerializationError = false;
+        private boolean generateCommitFailure = false;
+        private List<FieldSchema> schema = Arrays.asList(
+                new FieldSchema("name", serdeConstants.STRING_TYPE_NAME, ""),
+                new FieldSchema("favorite_number", 
serdeConstants.INT_TYPE_NAME, ""),
+                new FieldSchema("favorite_color", 
serdeConstants.STRING_TYPE_NAME, ""),
+                new FieldSchema("scale", serdeConstants.DOUBLE_TYPE_NAME, "")
+        );
+
+        @Override
+        StreamingConnection makeStreamingConnection(HiveOptions options, 
RecordReader reader) throws StreamingException {
+
+            if (generateConnectFailure) {
+                throw new StubConnectionError("Unit Test - Connection Error");
+            }
+
+            HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, 
getLogger());
+            MockHiveStreamingConnection hiveConnection = new 
MockHiveStreamingConnection(options, reader, hiveRecordWriter, schema);
+            hiveConnection.setGenerateWriteFailure(generateWriteFailure);
+            
hiveConnection.setGenerateSerializationError(generateSerializationError);
+            hiveConnection.setGenerateCommitFailure(generateCommitFailure);
+            return hiveConnection;
+        }
+
+        void setGenerateConnectFailure(boolean generateConnectFailure) {
+            this.generateConnectFailure = generateConnectFailure;
+        }
+
+        void setGenerateWriteFailure(boolean generateWriteFailure) {
+            this.generateWriteFailure = generateWriteFailure;
+        }
+
+        void setGenerateSerializationError(boolean generateSerializationError) 
{
+            this.generateSerializationError = generateSerializationError;
+        }
+
+        void setGenerateCommitFailure(boolean generateCommitFailure) {
+            this.generateCommitFailure = generateCommitFailure;
+        }
+
+        void setFields(List<FieldSchema> schema) {
+            this.schema = schema;
+        }
+    }
+
+    private class MockHiveStreamingConnection implements StreamingConnection {
+
+        private boolean generateWriteFailure = false;
+        private boolean generateSerializationError = false;
+        private boolean generateCommitFailure = false;
+        private int writeAttemptCount = 0;
+        private ConnectionStats connectionStats;
+        private HiveOptions options;
+        private RecordWriter writer;
+        private HiveConf hiveConf;
+        private Table table;
+        private String metastoreURI;
+
+        MockHiveStreamingConnection(HiveOptions options, RecordReader reader, 
RecordWriter recordWriter, List<FieldSchema> schema) {
+            this.options = options;
+            metastoreURI = options.getMetaStoreURI();
+            this.writer = recordWriter;
+            this.hiveConf = this.options.getHiveConf();
+            connectionStats = new ConnectionStats();
+            this.table = new 
Table(Table.getEmptyTable(options.getDatabaseName(), options.getTableName()));
+            this.table.setFields(schema);
+            StorageDescriptor sd = this.table.getSd();
+            sd.setOutputFormat(OrcOutputFormat.class.getName());
+            sd.setLocation(TARGET_HIVE);
+        }
+
+        @Override
+        public HiveConf getHiveConf() {
+            return hiveConf;
+        }
+
+        @Override
+        public void beginTransaction() throws StreamingException {
+            writer.init(this, 0, 100);
+        }
+
+        @Override
+        public synchronized void write(byte[] record) throws 
StreamingException {
+            throw new UnsupportedOperationException(this.getClass().getName() 
+ " does not support writing of records via bytes, only via an InputStream");
+        }
+
+        @Override
+        public void write(InputStream inputStream) throws StreamingException {
+            try {
+                if (generateWriteFailure) {
+                    throw new StubStreamingIOFailure("Unit Test - Streaming IO 
Failure");
+                }
+                if (generateSerializationError) {
+                    throw new StubSerializationError("Unit Test - 
Serialization error", new Exception());
+                }
+                this.writer.write(writeAttemptCount, inputStream);
+            } finally {
+                writeAttemptCount++;
+            }
+        }
+
+        @Override
+        public void commitTransaction() throws StreamingException {
+            if (generateCommitFailure) {
+                throw new StubTransactionError("Unit Test - Commit Failure");
+            }
+            connectionStats.incrementCommittedTransactions();
+        }
+
+        @Override
+        public void abortTransaction() throws StreamingException {
+            connectionStats.incrementAbortedTransactions();
+        }
+
+        @Override
+        public void close() {
+            // closing the connection shouldn't throw an exception
+        }
+
+        @Override
+        public ConnectionStats getConnectionStats() {
+            return connectionStats;
+        }
+
+        public void setGenerateWriteFailure(boolean generateWriteFailure) {
+            this.generateWriteFailure = generateWriteFailure;
+        }
+
+        public void setGenerateSerializationError(boolean 
generateSerializationError) {
+            this.generateSerializationError = generateSerializationError;
+        }
+
+        public void setGenerateCommitFailure(boolean generateCommitFailure) {
+            this.generateCommitFailure = generateCommitFailure;
+        }
+
+        @Override
+        public String getMetastoreUri() {
+            return metastoreURI;
+        }
+
+        @Override
+        public Table getTable() {
+            return table;
+        }
+
+        @Override
+        public List<String> getStaticPartitionValues() {
+            return null;
+        }
+
+        @Override
+        public boolean isPartitionedTable() {
+            return false;
+        }
+
+        @Override
+        public boolean isDynamicPartitioning() {
+            return false;
+        }
+
+        @Override
+        public String getAgentInfo() {
+            return null;
+        }
+
+        @Override
+        public PartitionInfo createPartitionIfNotExists(List<String> list) 
throws StreamingException {
+            return null;
+        }
+    }
+
+    private static class MockKerberosCredentialsService implements 
KerberosCredentialsService, ControllerService {
+
+        private String keytab = "src/test/resources/fake.keytab";
+        private String principal = "t...@realm.com";
+
+        public MockKerberosCredentialsService() {
+        }
+
+        public MockKerberosCredentialsService(String keytab, String principal) 
{
+            this.keytab = keytab;
+            this.principal = principal;
+        }
+
+        @Override
+        public String getKeytab() {
+            return keytab;
+        }
+
+        @Override
+        public String getPrincipal() {
+            return principal;
+        }
+
+        @Override
+        public void initialize(ControllerServiceInitializationContext context) 
throws InitializationException {
+
+        }
+
+        @Override
+        public Collection<ValidationResult> validate(ValidationContext 
context) {
+            return Collections.EMPTY_LIST;
+        }
+
+        @Override
+        public PropertyDescriptor getPropertyDescriptor(String name) {
+            return null;
+        }
+
+        @Override
+        public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+
+        }
+
+        @Override
+        public List<PropertyDescriptor> getPropertyDescriptors() {
+            return null;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "kcs";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
new file mode 100644
index 0000000..50e83ac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.dbcp.hive.Hive3DBCPService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.hive.HiveJdbcCommon;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static 
org.apache.nifi.processors.hive.SelectHive3QL.HIVEQL_OUTPUT_FORMAT;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE;
+import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSelectHive3QL {
+
+    private static final Logger LOGGER;
+    private final static String MAX_ROWS_KEY = "maxRows";
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.SelectHive3QL",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.TestSelectHive3QL",
 "debug");
+        LOGGER = LoggerFactory.getLogger(TestSelectHive3QL.class);
+    }
+
+    private final static String DB_LOCATION = "target/db";
+
+    private final static String QUERY_WITH_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + " from persons PER"
+            + " where PER.ID > ${person.id}";
+
+    private final static String QUERY_WITHOUT_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+            + " from persons PER"
+            + " where PER.ID > 10";
+
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(SelectHive3QL.class);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(SelectHive3QL.HIVE_DBCP_SERVICE, "dbcp");
+    }
+
+    @Test
+    public void testIncomingConnectionWithNoFlowFile() throws 
InitializationException {
+        runner.setIncomingConnection(true);
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
persons");
+        runner.run();
+        runner.assertTransferCount(SelectHive3QL.REL_SUCCESS, 0);
+        runner.assertTransferCount(SelectHive3QL.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testNoIncomingConnection() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+        runner.setIncomingConnection(false);
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro");
+    }
+
+    @Test
+    public void testNoTimeLimit() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
+    }
+
+
+    @Test
+    public void testWithNullIntColumn() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((Hive3DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+            // Nothing to do, probably means the table didn't exist
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, 
NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 
1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHive3QL.RESULT_ROW_COUNT,
 "2");
+    }
+
+    @Test
+    public void testWithSqlException() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((Hive3DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NO_ROWS");
+        } catch (final SQLException sqle) {
+            // Nothing to do, probably means the table didn't exist
+        }
+
+        stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statement that will generate an error (val1 does 
not exist, e.g.)
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT val1 
FROM TEST_NO_ROWS");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testWithBadSQL() throws SQLException {
+        final String BAD_SQL = "create table TEST_NO_ROWS (id integer)";
+
+        // Test with incoming flow file (it should be routed to failure 
intact, i.e. same content and no parent)
+        runner.setIncomingConnection(true);
+        // Try a valid SQL statement that will generate an error (val1 does 
not exist, e.g.)
+        runner.enqueue(BAD_SQL);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_FAILURE).get(0);
+        flowFile.assertContentEquals(BAD_SQL);
+        flowFile.assertAttributeEquals("parentIds", null);
+        runner.clearTransferState();
+
+        // Test with no incoming flow file (an empty flow file is transferred)
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statement that will generate an error (val1 does 
not exist, e.g.)
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, BAD_SQL);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
+        flowFile = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_FAILURE).get(0);
+        flowFile.assertContentEquals("");
+    }
+
+    @Test
+    public void invokeOnTriggerWithCsv()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV);
+    }
+
+    @Test
+    public void invokeOnTriggerWithAvro()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO);
+    }
+
+    public void invokeOnTrigger(final String query, final boolean 
incomingFlowFile, String outputFormat)
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((Hive3DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        final Statement stmt = con.createStatement();
+        try {
+            stmt.execute("drop table persons");
+        } catch (final SQLException sqle) {
+            // Nothing to do here, the table didn't exist
+        }
+
+        stmt.execute("create table persons (id integer, name varchar(100), 
code integer)");
+        Random rng = new Random(53496);
+        final int nrOfRows = 100;
+        stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + 
rng.nextInt(469947) + ")");
+        for (int i = 2; i < nrOfRows; i++) {
+            stmt.executeUpdate("insert into persons values (" + i + ", 
'Someone Else', " + rng.nextInt(469947) + ")");
+        }
+        stmt.executeUpdate("insert into persons values (" + nrOfRows + ", 
'Last Person', NULL)");
+
+        LOGGER.info("test data loaded");
+
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query);
+        runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+
+        if (incomingFlowFile) {
+            // incoming FlowFile content is not used, but attributes are used
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("person.id", "10");
+            runner.enqueue("Hello".getBytes(), attributes);
+        }
+
+        runner.setIncomingConnection(incomingFlowFile);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS);
+        MockFlowFile flowFile = flowfiles.get(0);
+        final InputStream in = new 
ByteArrayInputStream(flowFile.toByteArray());
+        long recordsFromStream = 0;
+        if (AVRO.equals(outputFormat)) {
+            assertEquals(MIME_TYPE_AVRO_BINARY, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+            final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+            try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+                GenericRecord record = null;
+                while (dataFileReader.hasNext()) {
+                    // Reuse record object by passing it to next(). This saves 
us from
+                    // allocating and garbage collecting many objects for 
files with
+                    // many items.
+                    record = dataFileReader.next(record);
+                    recordsFromStream++;
+                }
+            }
+        } else {
+            assertEquals(CSV_MIME_TYPE, 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+            BufferedReader br = new BufferedReader(new InputStreamReader(in));
+
+            String headerRow = br.readLine();
+            // Derby capitalizes column names
+            assertEquals("PERSONID,PERSONNAME,PERSONCODE", headerRow);
+
+            // Validate rows
+            String line;
+            while ((line = br.readLine()) != null) {
+                recordsFromStream++;
+                String[] values = line.split(",");
+                if (recordsFromStream < (nrOfRows - 10)) {
+                    assertEquals(3, values.length);
+                    assertTrue(values[1].startsWith("\""));
+                    assertTrue(values[1].endsWith("\""));
+                } else {
+                    assertEquals(2, values.length); // Middle value is null
+                }
+            }
+        }
+        assertEquals(nrOfRows - 10, recordsFromStream);
+        assertEquals(recordsFromStream, 
Integer.parseInt(flowFile.getAttribute(SelectHive3QL.RESULT_ROW_COUNT)));
+        
flowFile.assertAttributeEquals(AbstractHive3QLProcessor.ATTR_INPUT_TABLES, 
"persons");
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 
03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
TEST_QUERY_DB_TABLE");
+        runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "${" + 
MAX_ROWS_KEY + "}");
+        runner.setProperty(SelectHive3QL.HIVEQL_OUTPUT_FORMAT, 
HiveJdbcCommon.AVRO);
+        runner.setVariable(MAX_ROWS_KEY, "9");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12);
+
+        //ensure all but the last file have 9 records each
+        for (int ff = 0; ff < 11; ff++) {
+            mff = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            assertEquals(9, getNumberOfRecordsFromStream(in));
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), 
mff.getAttribute("fragment.index"));
+            assertEquals("12", mff.getAttribute("fragment.count"));
+        }
+
+        //last file should have 1 record
+        mff = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11);
+        in = new ByteArrayInputStream(mff.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
+        assertEquals("12", mff.getAttribute("fragment.count"));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testParametrizedQuery() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 
03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "${" + 
MAX_ROWS_KEY + "}");
+        runner.setProperty(SelectHive3QL.HIVEQL_OUTPUT_FORMAT, 
HiveJdbcCommon.AVRO);
+        runner.setVariable(MAX_ROWS_KEY, "9");
+
+        Map<String, String> attributes = new HashMap<String, String>();
+        attributes.put("hiveql.args.1.value", "1");
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id = ?", 
attributes );
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 
03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "${" + 
MAX_ROWS_KEY + "}");
+        runner.setProperty(SelectHive3QL.HIVEQL_OUTPUT_FORMAT, 
HiveJdbcCommon.CSV);
+
+        runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE", new 
HashMap<String, String>() {{
+            put(MAX_ROWS_KEY, "9");
+        }});
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12);
+
+        //ensure all but the last file have 9 records (10 lines = 9 records + 
header) each
+        for (int ff = 0; ff < 11; ff++) {
+            mff = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            BufferedReader br = new BufferedReader(new InputStreamReader(in));
+            assertEquals(10, br.lines().count());
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), 
mff.getAttribute("fragment.index"));
+            assertEquals("12", mff.getAttribute("fragment.count"));
+        }
+
+        //last file should have 1 record (2 lines = 1 record + header)
+        mff = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11);
+        in = new ByteArrayInputStream(mff.toByteArray());
+        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        assertEquals(2, br.lines().count());
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
+        assertEquals("12", mff.getAttribute("fragment.count"));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFileWithMaxFragments() throws 
ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 
03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM 
TEST_QUERY_DB_TABLE");
+        runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "9");
+        Integer maxFragments = 3;
+        runner.setProperty(SelectHive3QL.MAX_FRAGMENTS, 
maxFragments.toString());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 
maxFragments);
+
+        for (int i = 0; i < maxFragments; i++) {
+            mff = 
runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(i);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            assertEquals(9, getNumberOfRecordsFromStream(in));
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(i), 
mff.getAttribute("fragment.index"));
+            assertEquals(maxFragments.toString(), 
mff.getAttribute("fragment.count"));
+        }
+
+        runner.clearTransferState();
+    }
+
+    private long getNumberOfRecordsFromStream(InputStream in) throws 
IOException {
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            GenericRecord record = null;
+            long recordsFromStream = 0;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us 
from
+                // allocating and garbage collecting many objects for files 
with
+                // many items.
+                record = dataFileReader.next(record);
+                recordsFromStream += 1;
+            }
+
+            return recordsFromStream;
+        }
+    }
+
+    /**
+     * Simple implementation only for SelectHive3QL processor testing.
+     */
+    private class DBCPServiceSimpleImpl extends AbstractControllerService 
implements Hive3DBCPService {
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION 
+ ";create=true");
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+
+        @Override
+        public String getConnectionURL() {
+            return "jdbc:derby:" + DB_LOCATION + ";create=true";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
new file mode 100644
index 0000000..e1af5a1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.orc;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.hadoop.exception.FailureException;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class PutORCTest {
+
+    private static final String DIRECTORY = "target";
+    private static final String TEST_CONF_PATH = 
"src/test/resources/core-site.xml";
+
+    private Schema schema;
+    private Configuration testConf;
+    private PutORC proc;
+    private TestRunner testRunner;
+
+    @BeforeClass
+    public static void setupLogging() {
+        BasicConfigurator.configure();
+    }
+
+    @Before
+    public void setup() throws IOException {
+        final String avroSchema = IOUtils.toString(new 
FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+
+        testConf = new Configuration();
+        testConf.addResource(new Path(TEST_CONF_PATH));
+
+        proc = new PutORC();
+    }
+
+    private void configure(final PutORC putORC, final int numUsers) throws 
InitializationException {
+        configure(putORC, numUsers, null);
+    }
+
+    private void configure(final PutORC putORC, final int numUsers, final 
BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws 
InitializationException {
+        testRunner = TestRunners.newTestRunner(putORC);
+        testRunner.setProperty(PutORC.HADOOP_CONFIGURATION_RESOURCES, 
TEST_CONF_PATH);
+        testRunner.setProperty(PutORC.DIRECTORY, DIRECTORY);
+
+        MockRecordParser readerFactory = new MockRecordParser();
+
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), 
recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        if (recordGenerator == null) {
+            for (int i = 0; i < numUsers; i++) {
+                readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0);
+            }
+        } else {
+            recordGenerator.apply(numUsers, readerFactory);
+        }
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+
+        testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
+    }
+
+    @Test
+    public void testWriteORCWithDefaults() throws IOException, 
InitializationException {
+        configure(proc, 100);
+
+        final String filename = "testORCWithDefaults-" + 
System.currentTimeMillis();
+
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.setProperty(PutORC.HIVE_TABLE_NAME, "myTable");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
+
+        final Path orcFile = new Path(DIRECTORY + "/" + filename);
+
+        // verify the successful flow file has the expected attributes
+        final MockFlowFile mockFlowFile = 
testRunner.getFlowFilesForRelationship(PutORC.REL_SUCCESS).get(0);
+        
mockFlowFile.assertAttributeEquals(PutORC.ABSOLUTE_HDFS_PATH_ATTRIBUTE, 
orcFile.getParent().toString());
+        mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), 
filename);
+        mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100");
+        mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
+                "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (name STRING, 
favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS ORC");
+
+        // verify we generated a provenance event
+        final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
+        assertEquals(1, provEvents.size());
+
+        // verify it was a SEND event with the correct URI
+        final ProvenanceEventRecord provEvent = provEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
+        Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + 
filename));
+
+        // verify the content of the ORC file by reading it back in
+        verifyORCUsers(orcFile, 100);
+
+        // verify we don't have the temp dot file after success
+        final File tempOrcFile = new File(DIRECTORY + "/." + filename);
+        Assert.assertFalse(tempOrcFile.exists());
+
+        // verify we DO have the CRC file after success
+        final File crcAvroORCFile = new File(DIRECTORY + "/." + filename + 
".crc");
+        Assert.assertTrue(crcAvroORCFile.exists());
+    }
+
+    @Test
+    public void testWriteORCWithAvroLogicalTypes() throws IOException, 
InitializationException {
+        final String avroSchema = IOUtils.toString(new 
FileInputStream("src/test/resources/user_logical_types.avsc"), 
StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+        Calendar now = Calendar.getInstance();
+        LocalTime nowTime = LocalTime.now();
+        LocalDateTime nowDateTime = LocalDateTime.now();
+        LocalDate epoch = LocalDate.ofEpochDay(0);
+        LocalDate nowDate = LocalDate.now();
+
+        final int timeMillis = nowTime.get(ChronoField.MILLI_OF_DAY);
+        final Timestamp timestampMillis = Timestamp.valueOf(nowDateTime);
+        final Date dt = Date.valueOf(nowDate);
+        final double dec = 1234.56;
+
+        configure(proc, 10, (numUsers, readerFactory) -> {
+            for (int i = 0; i < numUsers; i++) {
+                readerFactory.addRecord(
+                        i,
+                        timeMillis,
+                        timestampMillis,
+                        dt,
+                        dec);
+            }
+            return null;
+        });
+
+        final String filename = "testORCWithDefaults-" + 
System.currentTimeMillis();
+
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.setProperty(PutORC.HIVE_TABLE_NAME, "myTable");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
+
+        final Path orcFile = new Path(DIRECTORY + "/" + filename);
+
+        // verify the successful flow file has the expected attributes
+        final MockFlowFile mockFlowFile = 
testRunner.getFlowFilesForRelationship(PutORC.REL_SUCCESS).get(0);
+        
mockFlowFile.assertAttributeEquals(PutORC.ABSOLUTE_HDFS_PATH_ATTRIBUTE, 
orcFile.getParent().toString());
+        mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), 
filename);
+        mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10");
+        // DDL will be created with field names normalized (lowercased, e.g.) 
for Hive by default
+        mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
+                "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (id INT, 
timemillis INT, timestampmillis TIMESTAMP, dt DATE, dec DOUBLE) STORED AS ORC");
+
+        // verify we generated a provenance event
+        final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
+        assertEquals(1, provEvents.size());
+
+        // verify it was a SEND event with the correct URI
+        final ProvenanceEventRecord provEvent = provEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
+        Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + 
filename));
+
+        // verify the content of the ORC file by reading it back in
+        verifyORCUsers(orcFile, 10, (x, currUser) -> {
+                    assertEquals((int) currUser, ((IntWritable) 
x.get(0)).get());
+                    assertEquals(timeMillis, ((IntWritable) x.get(1)).get());
+                    assertEquals(timestampMillis, ((TimestampWritable) 
x.get(2)).getTimestamp());
+                    assertEquals(dt, ((DateWritable) x.get(3)).get());
+                    assertEquals(dec, ((DoubleWritable) x.get(4)).get(), 
Double.MIN_VALUE);
+                    return null;
+                }
+        );
+
+        // verify we don't have the temp dot file after success
+        final File tempOrcFile = new File(DIRECTORY + "/." + filename);
+        Assert.assertFalse(tempOrcFile.exists());
+
+        // verify we DO have the CRC file after success
+        final File crcAvroORCFile = new File(DIRECTORY + "/." + filename + 
".crc");
+        Assert.assertTrue(crcAvroORCFile.exists());
+    }
+
+    @Test
+    public void testValidSchemaWithELShouldBeSuccessful() throws 
InitializationException {
+        configure(proc, 10);
+
+        final String filename = "testValidSchemaWithELShouldBeSuccessful-" + 
System.currentTimeMillis();
+
+        // don't provide my.schema as an attribute
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("my.schema", schema.toString());
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() 
throws InitializationException, IOException, MalformedRecordException, 
SchemaNotFoundException {
+        configure(proc, 10);
+
+        final org.apache.nifi.serialization.RecordReader recordReader = 
Mockito.mock(org.apache.nifi.serialization.RecordReader.class);
+        when(recordReader.nextRecord()).thenThrow(new 
MalformedRecordException("ERROR"));
+
+        final RecordReaderFactory readerFactory = 
Mockito.mock(RecordReaderFactory.class);
+        when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
+        when(readerFactory.createRecordReader(any(FlowFile.class), 
any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+        testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
+
+        final String filename = 
"testMalformedRecordExceptionShouldRouteToFailure-" + 
System.currentTimeMillis();
+
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testIOExceptionCreatingWriterShouldRouteToRetry() throws 
InitializationException {
+        final PutORC proc = new PutORC() {
+            @Override
+            public HDFSRecordWriter createHDFSRecordWriter(ProcessContext 
context, FlowFile flowFile, Configuration conf, Path path, RecordSchema schema)
+                    throws IOException {
+                throw new IOException("IOException");
+            }
+        };
+        configure(proc, 0);
+
+        final String filename = 
"testMalformedRecordExceptionShouldRouteToFailure-" + 
System.currentTimeMillis();
+
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testIOExceptionFromReaderShouldRouteToRetry() throws 
InitializationException, IOException, MalformedRecordException, 
SchemaNotFoundException {
+        configure(proc, 10);
+
+        final RecordSet recordSet = Mockito.mock(RecordSet.class);
+        when(recordSet.next()).thenThrow(new IOException("ERROR"));
+
+        final org.apache.nifi.serialization.RecordReader recordReader = 
Mockito.mock(org.apache.nifi.serialization.RecordReader.class);
+        when(recordReader.createRecordSet()).thenReturn(recordSet);
+        
when(recordReader.getSchema()).thenReturn(AvroTypeUtil.createSchema(schema));
+
+        final RecordReaderFactory readerFactory = 
Mockito.mock(RecordReaderFactory.class);
+        when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
+        when(readerFactory.createRecordReader(any(FlowFile.class), 
any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+        testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
+
+        final String filename = 
"testMalformedRecordExceptionShouldRouteToFailure-" + 
System.currentTimeMillis();
+
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testIOExceptionRenamingShouldRouteToRetry() throws 
InitializationException {
+        final PutORC proc = new PutORC() {
+            @Override
+            protected void rename(FileSystem fileSystem, Path srcFile, Path 
destFile)
+                    throws IOException, InterruptedException, FailureException 
{
+                throw new IOException("IOException renaming");
+            }
+        };
+
+        configure(proc, 10);
+
+        final String filename = "testIOExceptionRenamingShouldRouteToRetry-" + 
System.currentTimeMillis();
+
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1);
+
+        // verify we don't have the temp dot file after success
+        final File tempAvroORCFile = new File(DIRECTORY + "/." + filename);
+        Assert.assertFalse(tempAvroORCFile.exists());
+    }
+
+    private void verifyORCUsers(final Path orcUsers, final int 
numExpectedUsers) throws IOException {
+        verifyORCUsers(orcUsers, numExpectedUsers, null);
+    }
+
+    private void verifyORCUsers(final Path orcUsers, final int 
numExpectedUsers, BiFunction<List<Object>, Integer, Void> assertFunction) 
throws IOException {
+        Reader reader = OrcFile.createReader(orcUsers, 
OrcFile.readerOptions(testConf));
+        RecordReader recordReader = reader.rows();
+
+        TypeInfo typeInfo =
+                
TypeInfoUtils.getTypeInfoFromTypeString("struct<name:string,favorite_number:int,favorite_color:string,scale:double>");
+        StructObjectInspector inspector = (StructObjectInspector)
+                OrcStruct.createObjectInspector(typeInfo);
+
+        int currUser = 0;
+        Object nextRecord = null;
+        while ((nextRecord = recordReader.next(nextRecord)) != null) {
+            Assert.assertNotNull(nextRecord);
+            Assert.assertTrue("Not an OrcStruct", nextRecord instanceof 
OrcStruct);
+            List<Object> x = inspector.getStructFieldsDataAsList(nextRecord);
+
+            if (assertFunction == null) {
+                assertEquals("name" + currUser, x.get(0).toString());
+                assertEquals(currUser, ((IntWritable) x.get(1)).get());
+                assertEquals("blue" + currUser, x.get(2).toString());
+                assertEquals(10.0 * currUser, ((DoubleWritable) 
x.get(3)).get(), Double.MIN_VALUE);
+            } else {
+                assertFunction.apply(x, currUser);
+            }
+            currUser++;
+        }
+
+        assertEquals(numExpectedUsers, currUser);
+    }
+
+}
\ No newline at end of file

Reply via email to