http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
index 5eabfe8..b312327 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
@@ -24,14 +24,24 @@ import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.hive.HiveDBCPService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler.OnError;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.InitConnection;
+import org.apache.nifi.processor.util.pattern.Put;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
 
 import java.nio.charset.Charset;
 import java.sql.Connection;
@@ -108,6 +118,7 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
         _propertyDescriptors.add(BATCH_SIZE);
         _propertyDescriptors.add(CHARSET);
         _propertyDescriptors.add(STATEMENT_DELIMITER);
+        _propertyDescriptors.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -117,6 +128,31 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
         relationships = Collections.unmodifiableSet(_relationships);
     }
 
+    private Put<FunctionContext, Connection> process;
+    private ExceptionHandler<FunctionContext> exceptionHandler;
+
+    @OnScheduled
+    public void constructProcess() {
+        exceptionHandler = new ExceptionHandler<>();
+        exceptionHandler.mapException(e -> {
+            if (e instanceof SQLNonTransientException) {
+                return ErrorTypes.InvalidInput;
+            } else if (e instanceof SQLException) {
+                return ErrorTypes.TemporalFailure;
+            } else {
+                return ErrorTypes.UnknownFailure;
+            }
+        });
+        
exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger()));
+
+        process = new Put<>();
+        process.setLogger(getLogger());
+        process.initConnection(initConnection);
+        process.fetchFlowFiles(fetchFlowFiles);
+        process.putFlowFile(putFlowFile);
+        process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, 
REL_RETRY));
+    }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
@@ -127,75 +163,95 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
         return relationships;
     }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(batchSize);
+    private class FunctionContext extends RollbackOnFailure {
+        final Charset charset;
+        final String statementDelimiter;
+        final long startNanos = System.nanoTime();
+
+        String connectionUrl;
+
 
-        if (flowFiles.isEmpty()) {
-            return;
+        private FunctionContext(boolean rollbackOnFailure, Charset charset, 
String statementDelimiter) {
+            super(rollbackOnFailure, false);
+            this.charset = charset;
+            this.statementDelimiter = statementDelimiter;
         }
+    }
 
-        final long startNanos = System.nanoTime();
-        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+    private InitConnection<FunctionContext, Connection> initConnection = 
(context, session, fc) -> {
         final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
-        final String statementDelimiter =   
context.getProperty(STATEMENT_DELIMITER).getValue();
+        final Connection connection = dbcpService.getConnection();
+        fc.connectionUrl = dbcpService.getConnectionURL();
+        return connection;
+    };
 
-        try (final Connection conn = dbcpService.getConnection()) {
+    private FetchFlowFiles<FunctionContext> fetchFlowFiles = (context, 
session, functionContext, result) -> {
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        return session.get(batchSize);
+    };
 
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final String script = getHiveQL(session, flowFile, 
charset);
-                    String regex = "(?<!\\\\)" + 
Pattern.quote(statementDelimiter);
+    private Put.PutFlowFile<FunctionContext, Connection> putFlowFile = 
(context, session, fc, conn, flowFile, result) -> {
+        final String script = getHiveQL(session, flowFile, fc.charset);
+        String regex = "(?<!\\\\)" + Pattern.quote(fc.statementDelimiter);
 
-                    String[] hiveQLs = script.split(regex);
+        String[] hiveQLs = script.split(regex);
 
-                    int loc = 1;
-                    for (String hiveQL: hiveQLs) {
-                        getLogger().debug("HiveQL: {}", new Object[]{hiveQL});
+        exceptionHandler.execute(fc, flowFile, input -> {
+            int loc = 1;
+            for (String hiveQL: hiveQLs) {
+                getLogger().debug("HiveQL: {}", new Object[]{hiveQL});
 
-                        if (!StringUtils.isEmpty(hiveQL.trim())) {
-                            final PreparedStatement stmt = 
conn.prepareStatement(hiveQL.trim());
+                if (!StringUtils.isEmpty(hiveQL.trim())) {
+                    final PreparedStatement stmt = 
conn.prepareStatement(hiveQL.trim());
 
-                            // Get ParameterMetadata
-                            // Hive JDBC Doesn't support this yet:
-                            // ParameterMetaData pmd = 
stmt.getParameterMetaData();
-                            // int paramCount = pmd.getParameterCount();
+                    // Get ParameterMetadata
+                    // Hive JDBC Doesn't support this yet:
+                    // ParameterMetaData pmd = stmt.getParameterMetaData();
+                    // int paramCount = pmd.getParameterCount();
+                    int paramCount = StringUtils.countMatches(hiveQL, "?");
 
-                            int paramCount = StringUtils.countMatches(hiveQL, 
"?");
+                    if (paramCount > 0) {
+                        loc = setParameters(loc, stmt, paramCount, 
flowFile.getAttributes());
+                    }
 
-                            if (paramCount > 0) {
-                                loc = setParameters(loc, stmt, paramCount, 
flowFile.getAttributes());
-                            }
+                    // Execute the statement
+                    stmt.execute();
+                    fc.proceed();
+                }
+            }
 
-                            // Execute the statement
-                            stmt.execute();
-                        }
-                    }
-                    // Emit a Provenance SEND event
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            // Emit a Provenance SEND event
+            final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
 
-                    session.getProvenanceReporter().send(flowFile, 
dbcpService.getConnectionURL(), transmissionMillis, true);
-                    session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile, fc.connectionUrl, 
transmissionMillis, true);
+            result.routeTo(flowFile, REL_SUCCESS);
 
-                } catch (final SQLException e) {
+        }, onFlowFileError(context, session, result));
 
-                    if (e instanceof SQLNonTransientException) {
-                        getLogger().error("Failed to update Hive for {} due to 
{}; routing to failure", new Object[]{flowFile, e});
-                        session.transfer(flowFile, REL_FAILURE);
-                    } else {
-                        getLogger().error("Failed to update Hive for {} due to 
{}; it is possible that retrying the operation will succeed, so routing to 
retry", new Object[]{flowFile, e});
-                        flowFile = session.penalize(flowFile);
-                        session.transfer(flowFile, REL_RETRY);
-                    }
+    };
 
-                }
+    private OnError<FunctionContext, FlowFile> onFlowFileError(final 
ProcessContext context, final ProcessSession session, final RoutingResult 
result) {
+        OnError<FunctionContext, FlowFile> onFlowFileError = 
ExceptionHandler.createOnError(context, session, result, REL_FAILURE, 
REL_RETRY);
+        onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
+            switch (r.destination()) {
+                case Failure:
+                    getLogger().error("Failed to update Hive for {} due to {}; 
routing to failure", new Object[] {i, e}, e);
+                    break;
+                case Retry:
+                    getLogger().error("Failed to update Hive for {} due to {}; 
it is possible that retrying the operation will succeed, so routing to retry",
+                            new Object[] {i, e}, e);
+                    break;
             }
-        } catch (final SQLException sqle) {
-            // There was a problem getting the connection, yield and retry the 
flowfiles
-            getLogger().error("Failed to get Hive connection due to {}; it is 
possible that retrying the operation will succeed, so routing to retry", new 
Object[]{sqle});
-            session.transfer(flowFiles, REL_RETRY);
-            context.yield();
-        }
+        });
+        return RollbackOnFailure.createOnError(onFlowFileError);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
+        final Boolean rollbackOnFailure = 
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+        final String statementDelimiter = 
context.getProperty(STATEMENT_DELIMITER).getValue();
+        final FunctionContext functionContext = new 
FunctionContext(rollbackOnFailure, charset, statementDelimiter);
+        RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, 
getLogger(), session -> process.onTrigger(context, session, functionContext));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index 1494595..2754f9c 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.processors.hive;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileStream;
@@ -46,34 +45,38 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
 import org.apache.nifi.util.hive.AuthenticationFailedException;
 import org.apache.nifi.util.hive.HiveConfigurator;
 import org.apache.nifi.util.hive.HiveOptions;
 import org.apache.nifi.util.hive.HiveUtils;
 import org.apache.nifi.util.hive.HiveWriter;
-import org.json.JSONException;
-import org.json.JSONObject;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -81,6 +84,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
 /**
@@ -96,7 +101,7 @@ import java.util.regex.Pattern;
                 + "and 'failure' relationships, and contains the number of 
records from the incoming flow file written successfully and unsuccessfully, 
respectively.")
 })
 @RequiresInstanceClassLoading
-public class PutHiveStreaming extends AbstractProcessor {
+public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
 
     // Attributes
     public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = 
"hivestreaming.record.count";
@@ -231,21 +236,27 @@ public class PutHiveStreaming extends AbstractProcessor {
             .defaultValue("10000")
             .build();
 
+    public static final PropertyDescriptor ROLLBACK_ON_FAILURE = 
RollbackOnFailure.createRollbackOnFailureProperty(
+            "NOTE: When an error occurred after a Hive streaming transaction 
which is derived from the same input FlowFile is already committed," +
+                    " (i.e. a FlowFile contains more records than 'Records per 
Transaction' and a failure occurred at the 2nd transaction or later)" +
+                    " then the succeeded records will be transferred to 
'success' relationship while the original input FlowFile stays in incoming 
queue." +
+                    " Duplicated records can be created for the succeeded ones 
when the same FlowFile is processed again.");
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
-            .description("A FlowFile containing the JSON contents of a record 
is routed to this relationship after the record has been successfully 
transmitted to Hive.")
+            .description("A FlowFile containing Avro records routed to this 
relationship after the record has been successfully transmitted to Hive.")
             .build();
 
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("A FlowFile containing the JSON contents of a record 
is routed to this relationship if the record could not be transmitted to Hive.")
+            .description("A FlowFile containing Avro records routed to this 
relationship if the record could not be transmitted to Hive.")
             .build();
 
     public static final Relationship REL_RETRY = new Relationship.Builder()
             .name("retry")
             .description("The incoming FlowFile is routed to this relationship 
if its records cannot be transmitted to Hive. Note that "
-                    + "some records may have been processed successfully, they 
will be routed (as JSON flow files) to the success relationship. "
+                    + "some records may have been processed successfully, they 
will be routed (as Avro flow files) to the success relationship. "
                     + "The combination of the retry, success, and failure 
relationships indicate how many records succeeded and/or failed. This "
                     + "can be used to provide a retry capability since full 
rollback is not possible.")
             .build();
@@ -283,6 +294,7 @@ public class PutHiveStreaming extends AbstractProcessor {
         props.add(HEARTBEAT_INTERVAL);
         props.add(TXNS_PER_BATCH);
         props.add(RECORDS_PER_TXN);
+        props.add(ROLLBACK_ON_FAILURE);
 
         kerberosConfigFile = context.getKerberosConfigurationFile();
         kerberosProperties = new KerberosProperties(kerberosConfigFile);
@@ -364,8 +376,213 @@ public class PutHiveStreaming extends AbstractProcessor {
         setupHeartBeatTimer();
     }
 
+    private static class FunctionContext extends RollbackOnFailure {
+
+        private FlowFile inputFlowFile;
+        private AtomicReference<FlowFile> successFlowFile;
+        private AtomicReference<FlowFile> failureFlowFile;
+        private final DataFileWriter<GenericRecord> successAvroWriter = new 
DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+        private final DataFileWriter<GenericRecord> failureAvroWriter = new 
DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+
+        private final AtomicInteger recordCount = new AtomicInteger(0);
+        private final AtomicInteger successfulRecordCount = new 
AtomicInteger(0);
+        private final AtomicInteger failedRecordCount = new AtomicInteger(0);
+
+        private volatile ExecutorService appendRecordThreadPool;
+        private volatile AtomicBoolean closed = new AtomicBoolean(false);
+        private final BlockingQueue<List<HiveStreamingRecord>> 
successRecordQueue = new ArrayBlockingQueue<>(100);
+        private final BlockingQueue<List<HiveStreamingRecord>> 
failureRecordQueue = new ArrayBlockingQueue<>(100);
+
+        private final ComponentLog logger;
+
+        /**
+         * It's possible that multiple Hive streaming transactions are 
committed within a single onTrigger.
+         * PutHiveStreaming onTrigger is not 'transactional' in a sense of 
RollbackOnFailure.
+         * Once a Hive streaming transaction is committed, processor session 
will not be rolled back.
+         * @param rollbackOnFailure whether process session should be rolled 
back if failed
+         */
+        private FunctionContext(boolean rollbackOnFailure, ComponentLog 
logger) {
+            super(rollbackOnFailure, false);
+            this.logger = logger;
+        }
+
+        private void setFlowFiles(FlowFile inputFlowFile, FlowFile 
successFlowFile, FlowFile failureFlowFile) {
+            this.inputFlowFile = inputFlowFile;
+            this.successFlowFile = new AtomicReference<>(successFlowFile);
+            this.failureFlowFile = new AtomicReference<>(failureFlowFile);
+        }
+
+        private void initAvroWriter(ProcessSession session, String codec, 
DataFileStream<GenericRecord> reader,
+                                    DataFileWriter<GenericRecord> writer, 
AtomicReference<FlowFile> flowFileRef,
+                                    BlockingQueue<List<HiveStreamingRecord>> 
queue, Function<Integer, Boolean> isCompleted) {
+
+            writer.setCodec(CodecFactory.fromString(codec));
+            // Transfer metadata (this is a subset of the incoming file)
+            for (String metaKey : reader.getMetaKeys()) {
+                if (!RESERVED_METADATA.contains(metaKey)) {
+                    writer.setMeta(metaKey, reader.getMeta(metaKey));
+                }
+            }
+
+            appendRecordThreadPool.submit(() -> {
+                flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
+                    // Create writer so that records can be appended.
+                    writer.create(reader.getSchema(), out);
+
+                    try {
+                        int writtenCount = 0;
+                        while (true) {
+
+                            if (closed.get() && 
isCompleted.apply(writtenCount)) {
+                                break;
+                            }
+
+                            final List<HiveStreamingRecord> hRecords = 
queue.poll(100, TimeUnit.MILLISECONDS);
+                            if (hRecords != null) {
+                                try {
+                                    for (HiveStreamingRecord hRecord : 
hRecords) {
+                                        writer.append(hRecord.getRecord());
+                                        writtenCount++;
+                                    }
+                                } catch (IOException ioe) {
+                                    // The records were put to Hive Streaming 
successfully, but there was an error while writing the
+                                    // Avro records to the flow file. Log as 
an error and move on.
+                                    logger.error("Error writing Avro records 
(which were sent successfully to Hive Streaming) to the flow file, " + ioe, 
ioe);
+                                }
+                            }
+                        }
+                        writer.flush();
+                    } catch (InterruptedException e) {
+                        logger.warn("Append record thread is interrupted, " + 
e, e);
+                    }
+
+                }));
+            });
+        }
+
+        private void initAvroWriters(ProcessSession session, String codec, 
DataFileStream<GenericRecord> reader) {
+            appendRecordThreadPool = Executors.newFixedThreadPool(2);
+            initAvroWriter(session, codec, reader, successAvroWriter, 
successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get());
+            initAvroWriter(session, codec, reader, failureAvroWriter, 
failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get());
+
+            // No new task.
+            appendRecordThreadPool.shutdown();
+        }
+
+        private void appendRecordsToSuccess(List<HiveStreamingRecord> records) 
{
+            appendRecordsToFlowFile(records, successRecordQueue);
+            successfulRecordCount.addAndGet(records.size());
+        }
+
+        private void appendRecordsToFailure(List<HiveStreamingRecord> records) 
{
+            appendRecordsToFlowFile(records, failureRecordQueue);
+            failedRecordCount.addAndGet(records.size());
+        }
+
+        private void appendRecordsToFlowFile(List<HiveStreamingRecord> 
records, BlockingQueue<List<HiveStreamingRecord>> queue) {
+            if (!queue.add(records)) {
+                throw new ProcessException(String.format("Failed to append %d 
records due to insufficient internal queue capacity.", records.size()));
+            }
+        }
+
+        private void transferFlowFiles(ProcessSession session, RoutingResult 
result, String transitUri) {
+
+            closeAvroWriters();
+
+            if (successfulRecordCount.get() > 0) {
+                // Transfer the flow file with successful records
+                successFlowFile.set(
+                        session.putAttribute(successFlowFile.get(), 
HIVE_STREAMING_RECORD_COUNT_ATTR, 
Integer.toString(successfulRecordCount.get())));
+                session.getProvenanceReporter().send(successFlowFile.get(), 
transitUri);
+                result.routeTo(successFlowFile.get(), REL_SUCCESS);
+            } else {
+                session.remove(successFlowFile.get());
+            }
+
+            if (failedRecordCount.get() > 0) {
+                // There were some failed records, so transfer that flow file 
to failure
+                failureFlowFile.set(
+                        session.putAttribute(failureFlowFile.get(), 
HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(failedRecordCount.get())));
+                result.routeTo(failureFlowFile.get(), REL_FAILURE);
+            } else {
+                session.remove(failureFlowFile.get());
+            }
+
+            result.getRoutedFlowFiles().forEach((relationship, flowFiles) -> {
+                session.transfer(flowFiles, relationship);
+            });
+        }
+
+        private void closeAvroWriters() {
+            closed.set(true);
+            if (appendRecordThreadPool != null) {
+                // Having null thread pool means the input FlowFile was not 
processed at all, due to illegal format.
+                try {
+                    if (!appendRecordThreadPool.awaitTermination(10, 
TimeUnit.SECONDS)) {
+                        logger.warn("Waiting for Avro records being appended 
into output FlowFiles has been timeout.");
+                    }
+                } catch (InterruptedException e) {
+                    logger.warn("Waiting for Avro records being appended into 
output FlowFiles has been interrupted.");
+                }
+            }
+        }
+    }
+
+    private static class ShouldRetryException extends RuntimeException {
+        private ShouldRetryException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    private ExceptionHandler.OnError<FunctionContext, 
List<HiveStreamingRecord>> onHiveRecordsError(ProcessContext context, 
ProcessSession session) {
+        return RollbackOnFailure.createOnError((fc, input, res, e) -> {
+
+            if (res.penalty() == ErrorTypes.Penalty.Yield) {
+                context.yield();
+            }
+
+            switch (res.destination()) {
+                case Failure:
+                    // Add the failed record to the failure flow file
+                    getLogger().error(String.format("Error writing %s to Hive 
Streaming transaction due to %s", input, e), e);
+                    fc.appendRecordsToFailure(input);
+                    break;
+
+                case Retry:
+                    // If we can't connect to the endpoint, exit the loop and 
let the outer exception handler route the original flow file to retry
+                    abortAndCloseWriters();
+                    throw new ShouldRetryException("Hive Streaming 
connect/write error, flow file will be penalized and routed to retry. " + e, e);
+
+                case Self:
+                    abortAndCloseWriters();
+                    break;
+
+                default:
+                    abortAndCloseWriters();
+                    if (e instanceof ProcessException) {
+                        throw (ProcessException) e;
+                    } else {
+                        throw new ProcessException(String.format("Error 
writing %s to Hive Streaming transaction due to %s", input, e), e);
+                    }
+            }
+        });
+    }
+
+    private ExceptionHandler.OnError<FunctionContext, HiveStreamingRecord> 
onHiveRecordError(ProcessContext context, ProcessSession session) {
+        return (fc, input, res, e) -> onHiveRecordsError(context, 
session).apply(fc, Collections.singletonList(input), res, e);
+    }
+
+    private ExceptionHandler.OnError<FunctionContext, GenericRecord> 
onRecordError(ProcessContext context, ProcessSession session) {
+        return (fc, input, res, e) -> onHiveRecordError(context, 
session).apply(fc, new HiveStreamingRecord(null, input), res, e);
+    }
+
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
+        final FunctionContext functionContext = new 
FunctionContext(context.getProperty(ROLLBACK_ON_FAILURE).asBoolean(), 
getLogger());
+        RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, 
getLogger(), session -> onTrigger(context, session, functionContext));
+    }
+
+    private void onTrigger(ProcessContext context, ProcessSession session, 
FunctionContext functionContext) throws ProcessException {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
@@ -390,22 +607,58 @@ public class PutHiveStreaming extends AbstractProcessor {
             }
         }
 
-        final AtomicInteger recordCount = new AtomicInteger(0);
-        final AtomicInteger successfulRecordCount = new AtomicInteger(0);
-        List<HiveStreamingRecord> successfulRecords = new LinkedList<>();
+        final AtomicReference<List<HiveStreamingRecord>> successfulRecords = 
new AtomicReference<>();
+        successfulRecords.set(new ArrayList<>());
         final FlowFile inputFlowFile = flowFile;
-        final AtomicBoolean processingFailure = new AtomicBoolean(false);
+
+        final RoutingResult result = new RoutingResult();
+        final ExceptionHandler<FunctionContext> exceptionHandler = new 
ExceptionHandler<>();
+        exceptionHandler.mapException(s -> {
+            try {
+                if (s == null) {
+                    return ErrorTypes.PersistentFailure;
+                }
+                throw s;
+
+            } catch (IllegalArgumentException
+                    | HiveWriter.WriteFailure
+                    | SerializationError inputError) {
+
+                return ErrorTypes.InvalidInput;
+
+            } catch (HiveWriter.CommitFailure
+                    | HiveWriter.TxnBatchFailure
+                    | HiveWriter.TxnFailure writerTxError) {
+
+                return ErrorTypes.TemporalInputFailure;
+
+            } catch (ConnectionError
+                    | HiveWriter.ConnectFailure connectionError) {
+                // Can't connect to Hive endpoint.
+                log.error("Error connecting to Hive endpoint: table {} at {}",
+                        new Object[]{options.getTableName(), 
options.getMetaStoreURI()});
+
+                return ErrorTypes.TemporalFailure;
+
+            } catch (IOException
+                    | InterruptedException tempError) {
+                return ErrorTypes.TemporalFailure;
+
+            } catch (Exception t) {
+                return ErrorTypes.UnknownFailure;
+            }
+        });
+        final BiFunction<FunctionContext, ErrorTypes, ErrorTypes.Result> 
adjustError = RollbackOnFailure.createAdjustError(getLogger());
+        exceptionHandler.adjustError(adjustError);
 
         // Create output flow files and their Avro writers
-        AtomicReference<FlowFile> successFlowFile = new 
AtomicReference<>(session.create(inputFlowFile));
-        final DataFileWriter<GenericRecord> successAvroWriter = new 
DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
-        AtomicReference<FlowFile> failureFlowFile = new 
AtomicReference<>(session.create(inputFlowFile));
-        final DataFileWriter<GenericRecord> failureAvroWriter = new 
DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+        functionContext.setFlowFiles(inputFlowFile, 
session.create(inputFlowFile), session.create(inputFlowFile));
 
         try {
             session.read(inputFlowFile, in -> {
 
                 try (final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
                     GenericRecord currRecord = null;
 
                     // Copy codec and schema information to all writers
@@ -413,239 +666,133 @@ public class PutHiveStreaming extends AbstractProcessor 
{
                             ? DataFileConstants.NULL_CODEC
                             : reader.getMetaString(DataFileConstants.CODEC);
 
-                    Arrays.asList(successAvroWriter, failureAvroWriter)
-                            .forEach((writer) -> {
-                                
writer.setCodec(CodecFactory.fromString(codec));
-                                // Transfer metadata (this is a subset of the 
incoming file)
-                                for (String metaKey : reader.getMetaKeys()) {
-                                    if (!RESERVED_METADATA.contains(metaKey)) {
-                                        writer.setMeta(metaKey, 
reader.getMeta(metaKey));
-                                    }
-                                }
-                            });
+                    functionContext.initAvroWriters(session, codec, reader);
+
+                    Runnable flushSuccessfulRecords = () -> {
+                        // Now send the records to the successful FlowFile and 
update the success count
+                        
functionContext.appendRecordsToSuccess(successfulRecords.get());
+                        // Clear the list of successful records, we'll use it 
at the end when we flush whatever records are left
+                        successfulRecords.set(new ArrayList<>());
+                    };
 
                     while (reader.hasNext()) {
-                        currRecord = reader.next(currRecord);
-                        recordCount.incrementAndGet();
+                        // We can NOT reuse currRecord here, because 
currRecord is accumulated in successful records.
+                        // If we use the same GenericRecord instance, every 
record ends up having the same contents.
+                        // To avoid this, we need to create a brand new 
GenericRecord instance here each time.
+                        currRecord = reader.next();
+                        functionContext.recordCount.incrementAndGet();
 
                         // Extract the partition values (they must be put 
separately into the Hive Streaming API)
                         List<String> partitionValues = new ArrayList<>();
 
-                        try {
+                        if (!exceptionHandler.execute(functionContext, 
currRecord, input -> {
                             for (String partition : partitionColumnList) {
-                                Object partitionValue = 
currRecord.get(partition);
+                                Object partitionValue = input.get(partition);
                                 if (partitionValue == null) {
-                                    throw new IOException("Partition column '" 
+ partition + "' not found in Avro record");
+                                    throw new 
IllegalArgumentException("Partition column '" + partition + "' not found in 
Avro record");
                                 }
                                 partitionValues.add(partitionValue.toString());
                             }
-                        } catch (IOException ioe) {
-                            // Add the failed record to the failure flow file
-                            log.error("Error writing record to Hive Streaming 
transaction", ioe);
-                            appendRecordsToFlowFile(session, 
Collections.singletonList(new HiveStreamingRecord(null, currRecord)),
-                                    failureFlowFile, failureAvroWriter, 
reader);
+                        }, onRecordError(context, session))) {
                             continue;
                         }
 
-                        List<Schema.Field> fields = 
currRecord.getSchema().getFields();
-                        if (fields != null) {
-                            JSONObject obj = new JSONObject();
-                            try {
-                                for (Schema.Field field : fields) {
-                                    String fieldName = field.name();
-                                    // Skip fields that are partition columns, 
we extracted those values above to create an EndPoint
-                                    if 
(!partitionColumnList.contains(fieldName)) {
-                                        Object value = 
currRecord.get(fieldName);
-                                        try {
-                                            obj.put(fieldName, value);
-                                        } catch (JSONException je) {
-                                            throw new IOException(je);
-                                        }
-                                    }
-                                }
-                            } catch (IOException ioe) {
-                                // This really shouldn't happen since we are 
iterating over the schema fields, but just in case,
-                                // add the failed record to the failure flow 
file.
-                                log.error("Error writing record to Hive 
Streaming transaction", ioe);
-                                appendRecordsToFlowFile(session, 
Collections.singletonList(new HiveStreamingRecord(null, currRecord)),
-                                        failureFlowFile, failureAvroWriter, 
reader);
-                                continue;
-                            }
-                            final HiveStreamingRecord record = new 
HiveStreamingRecord(partitionValues, currRecord);
-                            HiveEndPoint endPoint = null;
-                            HiveWriter hiveWriter = null;
-                            try {
-                                endPoint = 
makeHiveEndPoint(record.getPartitionValues(), options);
-                                hiveWriter = getOrCreateWriter(endPoint);
-                            } catch (ConnectionError
-                                    | HiveWriter.ConnectFailure
-                                    | InterruptedException connectionError) {
-                                // Can't connect to Hive endpoint.
-                                log.error("Error connecting to Hive endpoint: 
table {} at {}",
-                                        new Object[]{options.getTableName(), 
options.getMetaStoreURI()});
-                                // If we can't connect to the endpoint, exit 
the loop and let the outer exception handler route the original flow file to 
retry
-                                abortAndCloseWriters();
-                                throw new ProcessException(connectionError);
-                            }
-                            try {
-                                try {
-                                    
hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
-                                    successfulRecords.add(record);
-                                } catch (InterruptedException | 
HiveWriter.WriteFailure wf) {
-                                    // Add the failed record to the failure 
flow file
-                                    log.error("Error writing record to Hive 
Streaming transaction", wf);
-                                    appendRecordsToFlowFile(session, 
Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
-                                }
+                        final HiveStreamingRecord record = new 
HiveStreamingRecord(partitionValues, currRecord);
+                        final AtomicReference<HiveWriter> hiveWriterRef = new 
AtomicReference<>();
 
-                                // If we've reached the 
records-per-transaction limit, flush the Hive Writer and update the Avro Writer 
for successful records
-                                if (hiveWriter.getTotalRecords() >= 
recordsPerTxn) {
-                                    hiveWriter.flush(true);
-                                    // Now send the records to the success 
relationship and update the success count
-                                    try {
-                                        appendRecordsToFlowFile(session, 
successfulRecords, successFlowFile, successAvroWriter, reader);
-                                        
successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, 
incr) -> current + incr);
-
-                                        // Clear the list of successful 
records, we'll use it at the end when we flush whatever records are left
-                                        successfulRecords.clear();
-
-                                    } catch (IOException ioe) {
-                                        // The records were put to Hive 
Streaming successfully, but there was an error while writing the
-                                        // Avro records to the flow file. Log 
as an error and move on.
-                                        getLogger().error("Error writing Avro 
records (which were sent successfully to Hive Streaming) to the flow file", 
ioe);
-                                    }
-                                }
+                        // Write record to Hive streaming
+                        if (!exceptionHandler.execute(functionContext, record, 
input -> {
 
-                            } catch (InterruptedException
-                                    | HiveWriter.CommitFailure
-                                    | HiveWriter.TxnBatchFailure
-                                    | HiveWriter.TxnFailure
-                                    | SerializationError writeException) {
-
-                                log.error("Error writing record to Hive 
Streaming transaction", writeException);
-                                // Add the failed record to the failure flow 
file
-                                appendRecordsToFlowFile(session, 
Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
-
-                                if (!(writeException instanceof 
SerializationError)) {
-                                    try {
-                                        hiveWriter.abort();
-                                    } catch (Exception e) {
-                                        // Can't even abort properly, throw a 
process exception
-                                        throw new ProcessException(e);
-                                    }
+                            final HiveEndPoint endPoint = 
makeHiveEndPoint(record.getPartitionValues(), options);
+                            final HiveWriter hiveWriter = 
getOrCreateWriter(endPoint);
+                            hiveWriterRef.set(hiveWriter);
+
+                            
hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
+                            successfulRecords.get().add(record);
+
+                        }, onHiveRecordError(context, session))) {
+                            continue;
+                        }
+
+                        // If we've reached the records-per-transaction limit, 
flush the Hive Writer and update the Avro Writer for successful records
+                        final HiveWriter hiveWriter = hiveWriterRef.get();
+                        if (hiveWriter.getTotalRecords() >= recordsPerTxn) {
+                            exceptionHandler.execute(functionContext, 
successfulRecords.get(), input -> {
+
+                                hiveWriter.flush(true);
+                                // Proceed function context. Process session 
can't be rollback anymore.
+                                functionContext.proceed();
+
+                                // Now send the records to the success 
relationship and update the success count
+                                flushSuccessfulRecords.run();
+
+                            }, onHiveRecordsError(context, 
session).andThen((fc, input, res, commitException) -> {
+                                // Reset hiveWriter for succeeding records.
+                                switch (res.destination()) {
+                                    case Retry:
+                                    case Failure:
+                                        try {
+                                            // Abort current tx and move to 
next.
+                                            hiveWriter.abort();
+                                        } catch (Exception e) {
+                                            // Can't even abort properly, 
throw a process exception
+                                            throw new ProcessException(e);
+                                        }
                                 }
-                            }
+                            }));
                         }
                     }
-                    try {
+
+                    exceptionHandler.execute(functionContext, 
successfulRecords.get(), input -> {
                         // Finish any transactions
                         flushAllWriters(true);
                         closeAllWriters();
 
                         // Now send any remaining records to the success 
relationship and update the count
-                        appendRecordsToFlowFile(session, successfulRecords, 
successFlowFile, successAvroWriter, reader);
-                        
successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, 
incr) -> current + incr);
-                        successfulRecords.clear();
-
-                    } catch (HiveWriter.CommitFailure
-                            | HiveWriter.TxnBatchFailure
-                            | HiveWriter.TxnFailure
-                            | InterruptedException e) {
-
-                        // If any records are in the successfulRecords list 
but ended up here, then they actually weren't transferred successfully, so
-                        // route them to failure instead
-                        appendRecordsToFlowFile(session, successfulRecords, 
failureFlowFile, failureAvroWriter, reader);
-                    }
+                        flushSuccessfulRecords.run();
+
+                        // Append successfulRecords on failure.
+                    }, onHiveRecordsError(context, session));
+
                 } catch (IOException ioe) {
                     // The Avro file is invalid (or may not be an Avro file at 
all), send it to failure
-                    log.error("The incoming flow file can not be read as an 
Avro file, routing to failure", ioe);
-                    processingFailure.set(true);
-                }
-            });
-
-            if (recordCount.get() > 0) {
-                if (successfulRecordCount.get() > 0) {
-                    // Transfer the flow file with successful records
-                    successFlowFile.set(
-                            session.putAttribute(successFlowFile.get(), 
HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount.get())));
-                    
session.getProvenanceReporter().send(successFlowFile.get(), 
options.getMetaStoreURI());
-                    session.transfer(successFlowFile.get(), REL_SUCCESS);
-                } else {
-                    session.remove(successFlowFile.get());
-                }
+                    final ErrorTypes.Result adjusted = 
adjustError.apply(functionContext, ErrorTypes.InvalidInput);
+                    final String msg = "The incoming flow file can not be read 
as an Avro file";
+                    switch (adjusted.destination()) {
+                        case Failure:
+                            log.error(msg, ioe);
+                            result.routeTo(inputFlowFile, REL_FAILURE);
+                            break;
+                        case ProcessException:
+                            throw new ProcessException(msg, ioe);
 
-                if (recordCount.get() != successfulRecordCount.get()) {
-                    // There were some failed records, so transfer that flow 
file to failure
-                    failureFlowFile.set(
-                            session.putAttribute(failureFlowFile.get(), 
HIVE_STREAMING_RECORD_COUNT_ATTR,
-                                    Integer.toString(recordCount.get() - 
successfulRecordCount.get())));
-                    session.transfer(failureFlowFile.get(), REL_FAILURE);
-                } else {
-                    session.remove(failureFlowFile.get());
+                    }
                 }
-            } else {
-                // No records were processed, so remove the output flow files
-                session.remove(successFlowFile.get());
-                session.remove(failureFlowFile.get());
-            }
-            successFlowFile.set(null);
-            failureFlowFile.set(null);
+            });
 
             // If we got here, we've processed the outgoing flow files 
correctly, so remove the incoming one if necessary
-            if (processingFailure.get()) {
-                session.transfer(inputFlowFile, REL_FAILURE);
-            } else {
-                session.remove(flowFile);
+            if (result.getRoutedFlowFiles().values().stream().noneMatch(routed 
-> routed.contains(inputFlowFile))) {
+                session.remove(inputFlowFile);
             }
 
-        } catch (ProcessException pe) {
-            abortAndCloseWriters();
-            Throwable t = pe.getCause();
-            if (t != null) {
-                if (t instanceof ConnectionError
-                        || t instanceof HiveWriter.ConnectFailure
-                        || t instanceof HiveWriter.CommitFailure
-                        || t instanceof HiveWriter.TxnBatchFailure
-                        || t instanceof HiveWriter.TxnFailure
-                        || t instanceof InterruptedException) {
-                    log.error("Hive Streaming connect/write error, flow file 
will be penalized and routed to retry", t);
-                    flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, REL_RETRY);
-                    // Remove the ones we created
-                    if (successFlowFile.get() != null) {
-                        session.remove(successFlowFile.get());
-                    }
-                    if (failureFlowFile.get() != null) {
-                        session.remove(failureFlowFile.get());
-                    }
-                } else {
-                    throw pe;
-                }
-            } else {
-                throw pe;
-            }
+        } catch (DiscontinuedException e) {
+            // The input FlowFile processing is discontinued. Keep it in the 
input queue.
+            getLogger().warn("Discontinued processing for {} due to {}", new 
Object[]{flowFile, e}, e);
+            result.routeTo(flowFile, Relationship.SELF);
+
+        } catch (ShouldRetryException e) {
+            // This exception is already a result of adjusting an error, so 
simply transfer the FlowFile to retry.
+            getLogger().error(e.getMessage(), e);
+            flowFile = session.penalize(flowFile);
+            result.routeTo(flowFile, REL_RETRY);
+
         } finally {
+            functionContext.transferFlowFiles(session, result, 
options.getMetaStoreURI());
             // Restore original class loader, might not be necessary but is 
good practice since the processor task changed it
             Thread.currentThread().setContextClassLoader(originalClassloader);
         }
     }
 
-    private void appendRecordsToFlowFile(ProcessSession session,
-                                         List<HiveStreamingRecord> records,
-                                         AtomicReference<FlowFile> 
appendFlowFile,
-                                         DataFileWriter<GenericRecord> 
avroWriter,
-                                         DataFileStream<GenericRecord> reader) 
throws IOException {
-
-        appendFlowFile.set(session.append(appendFlowFile.get(), (out) -> {
-
-            try (DataFileWriter<GenericRecord> writer = 
avroWriter.create(reader.getSchema(), out)) {
-                for (HiveStreamingRecord sRecord : records) {
-                    writer.append(sRecord.getRecord());
-                }
-                writer.flush();
-            }
-        }));
-    }
-
     @OnStopped
     public void cleanup() {
         ComponentLog log = getLogger();

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index 342fada..e61fa9f 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -50,11 +50,13 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.PartialFunctions;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.hive.CsvOutputOptions;
 import org.apache.nifi.util.hive.HiveJdbcCommon;
@@ -209,7 +211,11 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
+        PartialFunctions.onTrigger(context, sessionFactory, getLogger(), 
session -> onTrigger(context, session));
+    }
+
+    private void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final FlowFile fileToProcess = (context.hasIncomingConnection()? 
session.get():null);
         FlowFile flowfile = null;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
index c7498f9..5624f79 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
@@ -19,6 +19,7 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.dbcp.hive.HiveDBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -42,6 +43,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestPutHiveQL {
     private static final String createPersons = "CREATE TABLE PERSONS (id 
integer primary key, name varchar(100), code integer)";
@@ -128,6 +130,91 @@ public class TestPutHiveQL {
         runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
     }
 
+    @Test
+    public void testFailInMiddleWithBadStatementRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
84)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally 
wrong syntax
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 
3)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 
44)".getBytes());
+        runner.run();
+
+        // The 1st one should be routed to success, others should stay in 
queue.
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(PutHiveQL.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testFailAtBeginning() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally 
wrong syntax
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 
3)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 
44)".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
+        runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 2);
+    }
+
+    @Test
+    public void testFailAtBeginningRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally 
wrong syntax
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 
3)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 
44)".getBytes());
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(PutHiveQL.REL_FAILURE, 0);
+        runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 0);
+    }
 
     @Test
     public void testFailInMiddleWithBadParameterType() throws 
InitializationException, ProcessException, SQLException, IOException {
@@ -189,7 +276,56 @@ public class TestPutHiveQL {
 
         final Map<String, String> badAttributes = new HashMap<>();
         badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
-        badAttributes.put("hiveql.args.1.value", "9999");
+        badAttributes.put("hiveql.args.1.value", "101"); // Constraint 
violation, up to 100
+
+        final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.run();
+
+        runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
+        runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertTrue(rs.next());
+                assertTrue(rs.next());
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testFailInMiddleWithBadNumberFormat() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("hiveql.args.1.type", 
String.valueOf(Types.INTEGER));
+        goodAttributes.put("hiveql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        badAttributes.put("hiveql.args.1.value", "NOT_NUMBER");
 
         final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
?)".getBytes();
         runner.enqueue(data, goodAttributes);
@@ -540,6 +676,44 @@ public class TestPutHiveQL {
         runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1);
     }
 
+    @Test
+    public void testRetryableFailureRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+        final DBCPService service = new SQLExceptionService(null);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.1.value", "1");
+
+        attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("hiveql.args.2.value", "Mark");
+
+        attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.3.value", "84");
+
+        attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("hiveql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        try {
+            runner.run();
+            fail("Should throw ProcessException");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+        runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0);
+    }
+
     /**
      * Simple implementation only for testing purposes
      */
@@ -607,7 +781,7 @@ public class TestPutHiveQL {
 
         @Override
         public String getConnectionURL() {
-            return service.getConnectionURL();
+            return service != null ? service.getConnectionURL() : null;
         }
     }
 }

Reply via email to