Repository: nifi
Updated Branches:
  refs/heads/master a1bffbcc8 -> d9acdb54b


http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 6d7c504..cb3b198 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -26,18 +26,28 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.PartialFunctions;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
+import org.apache.nifi.processor.util.pattern.PutGroup;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import javax.xml.bind.DatatypeConverter;
@@ -52,6 +62,7 @@ import java.sql.Connection;
 import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLDataException;
 import java.sql.SQLException;
 import java.sql.SQLNonTransientException;
 import java.sql.Statement;
@@ -65,19 +76,19 @@ import java.time.format.DateTimeFormatter;
 import java.time.temporal.TemporalAccessor;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
+
 @SupportsBatching
 @SeeAlso(ConvertJSONToSQL.class)
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -109,7 +120,7 @@ import java.util.regex.Pattern;
         @WritesAttribute(attribute = "sql.generated.key", description = "If 
the database generated a key for an INSERT statement and the Obtain Generated 
Keys property is set to true, "
                 + "this attribute will be added to indicate the generated key, 
if possible. This feature is not supported by all database vendors.")
 })
-public class PutSQL extends AbstractProcessor {
+public class PutSQL extends AbstractSessionFactoryProcessor {
 
     static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
             .name("JDBC Connection Pool")
@@ -180,6 +191,7 @@ public class PutSQL extends AbstractProcessor {
         properties.add(TRANSACTION_TIMEOUT);
         properties.add(BATCH_SIZE);
         properties.add(OBTAIN_GENERATED_KEYS);
+        properties.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
         return properties;
     }
 
@@ -192,199 +204,176 @@ public class PutSQL extends AbstractProcessor {
         return rels;
     }
 
+    private static class FunctionContext extends RollbackOnFailure {
+        private boolean obtainKeys = false;
+        private boolean fragmentedTransaction = false;
+        private boolean originalAutoCommit = false;
+        private final long startNanos = System.nanoTime();
 
+        private FunctionContext(boolean rollbackOnFailure) {
+            super(rollbackOnFailure, true);
+        }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final FlowFilePoll poll = pollFlowFiles(context, session);
+        private boolean isSupportBatching() {
+            return !obtainKeys && !fragmentedTransaction;
+        }
+    }
+
+    private PutGroup<FunctionContext, Connection, StatementFlowFileEnclosure> 
process;
+    private BiFunction<FunctionContext, ErrorTypes, ErrorTypes.Result> 
adjustError;
+    private ExceptionHandler<FunctionContext> exceptionHandler;
+
+
+    private final FetchFlowFiles<FunctionContext> fetchFlowFiles = (c, s, fc, 
r) -> {
+        final FlowFilePoll poll = pollFlowFiles(c, s, fc, r);
         if (poll == null) {
-            return;
+            return null;
         }
+        fc.fragmentedTransaction = poll.isFragmentedTransaction();
+        return poll.getFlowFiles();
+    };
 
-        final List<FlowFile> flowFiles = poll.getFlowFiles();
-        if (flowFiles == null) {
-            return;
+    private final PartialFunctions.InitConnection<FunctionContext, Connection> 
initConnection = (c, s, fc) -> {
+        final Connection connection = 
c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getConnection();
+        try {
+            fc.originalAutoCommit = connection.getAutoCommit();
+            connection.setAutoCommit(false);
+        } catch (SQLException e) {
+            throw new ProcessException("Failed to disable auto commit due to " 
+ e, e);
         }
+        return connection;
+    };
 
-        final long startNanos = System.nanoTime();
-        final boolean obtainKeys = 
context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
-        final Map<String, StatementFlowFileEnclosure> statementMap = new 
HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
-        final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles 
that have been sent
-        final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all 
flowfiles that we have processed
-        final Set<StatementFlowFileEnclosure> enclosuresToExecute = new 
LinkedHashSet<>(); // the enclosures that we've processed
 
-        // Because we can have a transaction that is necessary across multiple 
FlowFiles, things get complicated when
-        // some FlowFiles have been transferred to a relationship and then 
there is a failure. As a result, we will just
-        // map all FlowFiles to their destination relationship and do the 
session.transfer at the end. This way, if there
-        // is a failure, we can route all FlowFiles to failure if we need to.
-        final Map<FlowFile, Relationship> destinationRelationships = new 
HashMap<>();
+    @FunctionalInterface
+    private interface GroupingFunction {
+        void apply(final ProcessContext context, final ProcessSession session, 
final FunctionContext fc,
+                   final Connection conn, final List<FlowFile> flowFiles,
+                   final List<StatementFlowFileEnclosure> groups,
+                   final Map<String, StatementFlowFileEnclosure> 
sqlToEnclosure,
+                   final RoutingResult result);
+    }
 
-        final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
-        try (final Connection conn = dbcpService.getConnection()) {
-            final boolean originalAutoCommit = conn.getAutoCommit();
-            try {
-                conn.setAutoCommit(false);
-
-                for (final FlowFile flowFile : flowFiles) {
-                    processedFlowFiles.add(flowFile);
-                    final String sql = getSQL(session, flowFile);
-
-                    // Get the appropriate PreparedStatement to use.
-                    final StatementFlowFileEnclosure enclosure;
-                    try {
-                        enclosure = getEnclosure(sql, conn, statementMap, 
obtainKeys, poll.isFragmentedTransaction());
-                    } catch (final SQLNonTransientException e) {
-                        getLogger().error("Failed to update database for {} 
due to {}; routing to failure", new Object[] {flowFile, e});
-                        destinationRelationships.put(flowFile, REL_FAILURE);
-                        continue;
-                    }
+    private GroupingFunction groupFragmentedTransaction = (context, session, 
fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
+        final FragmentedEnclosure fragmentedEnclosure = new 
FragmentedEnclosure();
+        groups.add(fragmentedEnclosure);
 
-                    final PreparedStatement stmt = enclosure.getStatement();
+        for (final FlowFile flowFile : flowFiles) {
+            final String sql = getSQL(session, flowFile);
 
-                    // set the appropriate parameters on the statement.
-                    try {
-                        setParameters(stmt, flowFile.getAttributes());
-                    } catch (final SQLException | ProcessException pe) {
-                        getLogger().error("Cannot update database for {} due 
to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
-                        destinationRelationships.put(flowFile, REL_FAILURE);
-                        continue;
-                    }
+            final StatementFlowFileEnclosure enclosure = sqlToEnclosure
+                    .computeIfAbsent(sql, k -> new 
StatementFlowFileEnclosure(sql));
 
-                    // If we need to obtain keys, we cannot do so in a a Batch 
Update. So we have to execute the statement and close it.
-                    if (obtainKeys) {
-                        try {
-                            // Execute the actual update.
-                            stmt.executeUpdate();
-
-                            // attempt to determine the key that was 
generated, if any. This is not supported by all
-                            // database vendors, so if we cannot determine the 
generated key (or if the statement is not an INSERT),
-                            // we will just move on without setting the 
attribute.
-                            FlowFile sentFlowFile = flowFile;
-                            final String generatedKey = 
determineGeneratedKey(stmt);
-                            if (generatedKey != null) {
-                                sentFlowFile = 
session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
-                            }
-
-                            stmt.close();
-                            sentFlowFiles.add(sentFlowFile);
-                        } catch (final SQLNonTransientException e) {
-                            getLogger().error("Failed to update database for 
{} due to {}; routing to failure", new Object[] {flowFile, e});
-                            destinationRelationships.put(flowFile, 
REL_FAILURE);
-                            continue;
-                        } catch (final SQLException e) {
-                            getLogger().error("Failed to update database for 
{} due to {}; it is possible that retrying the operation will succeed, so 
routing to retry", new Object[] {flowFile, e});
-                            destinationRelationships.put(flowFile, REL_RETRY);
-                            continue;
-                        }
-                    } else {
-                        // We don't need to obtain keys. Just add the 
statement to the batch.
-                        stmt.addBatch();
-                        enclosure.addFlowFile(flowFile);
-                        enclosuresToExecute.add(enclosure);
-                    }
-                }
+            fragmentedEnclosure.addFlowFile(flowFile, enclosure);
+        }
+    };
 
-                // If we are not trying to obtain the generated keys, we will 
have
-                // PreparedStatement's that have batches added to them. We 
need to execute each batch and close
-                // the PreparedStatement.
-                for (final StatementFlowFileEnclosure enclosure : 
enclosuresToExecute) {
-                    try {
-                        final PreparedStatement stmt = 
enclosure.getStatement();
-                        stmt.executeBatch();
-                        sentFlowFiles.addAll(enclosure.getFlowFiles());
-                    } catch (final BatchUpdateException e) {
-                        // If we get a BatchUpdateException, then we want to 
determine which FlowFile caused the failure,
-                        // and route that FlowFile to failure while routing 
those that finished processing to success and those
-                        // that have not yet been executed to retry. If the 
FlowFile was
-                        // part of a fragmented transaction, then we must roll 
back all updates for this connection, because
-                        // other statements may have been successful and been 
part of this transaction.
-                        final int[] updateCounts = e.getUpdateCounts();
-                        final int offendingFlowFileIndex = updateCounts.length;
-                        final List<FlowFile> batchFlowFiles = 
enclosure.getFlowFiles();
-
-                        if (poll.isFragmentedTransaction()) {
-                            // There are potentially multiple statements for 
this one transaction. As a result,
-                            // we need to roll back the entire transaction and 
route all of the FlowFiles to failure.
-                            conn.rollback();
-                            final FlowFile offendingFlowFile = 
batchFlowFiles.get(offendingFlowFileIndex);
-                            getLogger().error("Failed to update database due 
to a failed batch update. A total of {} FlowFiles are required for this 
transaction, so routing all to failure. "
-                                    + "Offending FlowFile was {}, which caused 
the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, 
e});
-                            session.transfer(flowFiles, REL_FAILURE);
-                            return;
-                        }
+    private final GroupingFunction groupFlowFilesBySQLBatch = (context, 
session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
+        for (final FlowFile flowFile : flowFiles) {
+            final String sql = getSQL(session, flowFile);
+
+            // Get or create the appropriate PreparedStatement to use.
+            final StatementFlowFileEnclosure enclosure = sqlToEnclosure
+                    .computeIfAbsent(sql, k -> {
+                        final StatementFlowFileEnclosure newEnclosure = new 
StatementFlowFileEnclosure(sql);
+                        groups.add(newEnclosure);
+                        return newEnclosure;
+                    });
+
+            if(!exceptionHandler.execute(fc, flowFile, input -> {
+                final PreparedStatement stmt = 
enclosure.getCachedStatement(conn);
+                setParameters(stmt, flowFile.getAttributes());
+                stmt.addBatch();
+            }, onFlowFileError(context, session, result))) {
+                continue;
+            }
 
-                        // In the presence of a BatchUpdateException, the 
driver has the option of either stopping when an error
-                        // occurs, or continuing. If it continues, then it 
must account for all statements in the batch and for
-                        // those that fail return a Statement.EXECUTE_FAILED 
for the number of rows updated.
-                        // So we will iterate over all of the update counts 
returned. If any is equal to Statement.EXECUTE_FAILED,
-                        // we will route the corresponding FlowFile to 
failure. Otherwise, the FlowFile will go to success
-                        // unless it has not yet been processed (its index in 
the List > updateCounts.length).
-                        int failureCount = 0;
-                        int successCount = 0;
-                        int retryCount = 0;
-                        for (int i = 0; i < updateCounts.length; i++) {
-                            final int updateCount = updateCounts[i];
-                            final FlowFile flowFile = batchFlowFiles.get(i);
-                            if (updateCount == Statement.EXECUTE_FAILED) {
-                                destinationRelationships.put(flowFile, 
REL_FAILURE);
-                                failureCount++;
-                            } else {
-                                destinationRelationships.put(flowFile, 
REL_SUCCESS);
-                                successCount++;
-                            }
-                        }
+            enclosure.addFlowFile(flowFile);
+        }
+    };
 
-                        if (failureCount == 0) {
-                            // if no failures found, the driver decided not to 
execute the statements after the
-                            // failure, so route the last one to failure.
-                            final FlowFile failedFlowFile = 
batchFlowFiles.get(updateCounts.length);
-                            destinationRelationships.put(failedFlowFile, 
REL_FAILURE);
-                            failureCount++;
-                        }
+    private GroupingFunction groupFlowFilesBySQL = (context, session, fc, 
conn, flowFiles, groups, sqlToEnclosure, result) -> {
+        for (final FlowFile flowFile : flowFiles) {
+            final String sql = getSQL(session, flowFile);
 
-                        if (updateCounts.length < batchFlowFiles.size()) {
-                            final List<FlowFile> unexecuted = 
batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
-                            for (final FlowFile flowFile : unexecuted) {
-                                destinationRelationships.put(flowFile, 
REL_RETRY);
-                                retryCount++;
-                            }
-                        }
+            // Get or create the appropriate PreparedStatement to use.
+            final StatementFlowFileEnclosure enclosure = sqlToEnclosure
+                    .computeIfAbsent(sql, k -> {
+                        final StatementFlowFileEnclosure newEnclosure = new 
StatementFlowFileEnclosure(sql);
+                        groups.add(newEnclosure);
+                        return newEnclosure;
+                    });
 
-                        getLogger().error("Failed to update database due to a 
failed batch update. There were a total of {} FlowFiles that failed, {} that 
succeeded, "
-                                + "and {} that were not execute and will be 
routed to retry; ", new Object[] {failureCount, successCount, retryCount});
-                    } catch (final SQLNonTransientException e) {
-                        getLogger().error("Failed to update database for {} 
due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
+            enclosure.addFlowFile(flowFile);
+        }
+    };
+
+    final PutGroup.GroupFlowFiles<FunctionContext, Connection, 
StatementFlowFileEnclosure> groupFlowFiles = (context, session, fc, conn, 
flowFiles, result) -> {
+        final Map<String, StatementFlowFileEnclosure> sqlToEnclosure = new 
HashMap<>();
+        final List<StatementFlowFileEnclosure> groups = new ArrayList<>();
+
+        // There are three patterns:
+        // 1. Support batching: An enclosure has multiple FlowFiles being 
executed in a batch operation
+        // 2. Obtain keys: An enclosure has multiple FlowFiles, and each 
FlowFile is executed separately
+        // 3. Fragmented transaction: One FlowFile per Enclosure?
+        if (fc.obtainKeys) {
+            groupFlowFilesBySQL.apply(context, session, fc, conn, flowFiles, 
groups, sqlToEnclosure, result);
+        } else if (fc.fragmentedTransaction) {
+            groupFragmentedTransaction.apply(context, session, fc, conn, 
flowFiles, groups, sqlToEnclosure, result);
+        } else {
+            groupFlowFilesBySQLBatch.apply(context, session, fc, conn, 
flowFiles, groups, sqlToEnclosure, result);
+        }
 
-                        for (final FlowFile flowFile : 
enclosure.getFlowFiles()) {
-                            destinationRelationships.put(flowFile, 
REL_FAILURE);
-                        }
-                        continue;
-                    } catch (final SQLException e) {
-                        getLogger().error("Failed to update database for {} 
due to {}; it is possible that retrying the operation will succeed, so routing 
to retry",
-                                new Object[] {enclosure.getFlowFiles(), e});
+        return groups;
+    };
 
-                        for (final FlowFile flowFile : 
enclosure.getFlowFiles()) {
-                            destinationRelationships.put(flowFile, REL_RETRY);
-                        }
-                        continue;
-                    } finally {
-                        enclosure.getStatement().close();
-                    }
+    final PutGroup.PutFlowFiles<FunctionContext, Connection, 
StatementFlowFileEnclosure> putFlowFiles = (context, session, fc, conn, 
enclosure, result) -> {
+
+        if (fc.isSupportBatching()) {
+
+            // We have PreparedStatement that have batches added to them.
+            // We need to execute each batch and close the PreparedStatement.
+            exceptionHandler.execute(fc, enclosure, input -> {
+                try (final PreparedStatement stmt = 
enclosure.getCachedStatement(conn)) {
+                    stmt.executeBatch();
+                    result.routeTo(enclosure.getFlowFiles(), REL_SUCCESS);
                 }
-            } finally {
-                try {
-                    conn.commit();
-                } finally {
-                    // make sure that we try to set the auto commit back to 
whatever it was.
-                    if (originalAutoCommit) {
-                        try {
-                            conn.setAutoCommit(originalAutoCommit);
-                        } catch (final SQLException se) {
+            }, onBatchUpdateError(context, session, result));
+
+        } else {
+            for (final FlowFile flowFile : enclosure.getFlowFiles()) {
+
+                final StatementFlowFileEnclosure targetEnclosure
+                        = enclosure instanceof FragmentedEnclosure
+                        ? ((FragmentedEnclosure) 
enclosure).getTargetEnclosure(flowFile)
+                        : enclosure;
+
+                // Execute update one by one.
+                exceptionHandler.execute(fc, flowFile, input -> {
+                    try (final PreparedStatement stmt = 
targetEnclosure.getNewStatement(conn, fc.obtainKeys)) {
+
+                        // set the appropriate parameters on the statement.
+                        setParameters(stmt, flowFile.getAttributes());
+
+                        stmt.executeUpdate();
+
+                        // attempt to determine the key that was generated, if 
any. This is not supported by all
+                        // database vendors, so if we cannot determine the 
generated key (or if the statement is not an INSERT),
+                        // we will just move on without setting the attribute.
+                        FlowFile sentFlowFile = flowFile;
+                        final String generatedKey = 
determineGeneratedKey(stmt);
+                        if (generatedKey != null) {
+                            sentFlowFile = session.putAttribute(sentFlowFile, 
"sql.generated.key", generatedKey);
                         }
+
+                        result.routeTo(sentFlowFile, REL_SUCCESS);
+
                     }
-                }
+                }, onFlowFileError(context, session, result));
             }
+        }
 
+        if (result.contains(REL_SUCCESS)) {
             // Determine the database URL
             String url = "jdbc://unknown-host";
             try {
@@ -393,46 +382,167 @@ public class PutSQL extends AbstractProcessor {
             }
 
             // Emit a Provenance SEND event
-            final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-            for (final FlowFile flowFile : sentFlowFiles) {
+            final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
+            for (final FlowFile flowFile : 
result.getRoutedFlowFiles().get(REL_SUCCESS)) {
                 session.getProvenanceReporter().send(flowFile, url, 
transmissionMillis, true);
             }
+        }
+    };
+
+    private ExceptionHandler.OnError<FunctionContext, FlowFile> 
onFlowFileError(final ProcessContext context, final ProcessSession session, 
final RoutingResult result) {
+        ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = 
createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+        onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
+            switch (r.destination()) {
+                case Failure:
+                    getLogger().error("Failed to update database for {} due to 
{}; routing to failure", new Object[] {i, e}, e);
+                    break;
+                case Retry:
+                    getLogger().error("Failed to update database for {} due to 
{}; it is possible that retrying the operation will succeed, so routing to 
retry",
+                            new Object[] {i, e}, e);
+                    break;
+            }
+        });
+        return RollbackOnFailure.createOnError(onFlowFileError);
+    }
+
+    private ExceptionHandler.OnError<FunctionContext, 
StatementFlowFileEnclosure> onBatchUpdateError(
+            final ProcessContext context, final ProcessSession session, final 
RoutingResult result) {
+        return RollbackOnFailure.createOnError((c, enclosure, r, e) -> {
+
+            // If rollbackOnFailure is enabled, the error will be thrown as 
ProcessException instead.
+            if (e instanceof BatchUpdateException && !c.isRollbackOnFailure()) 
{
+
+                // If we get a BatchUpdateException, then we want to determine 
which FlowFile caused the failure,
+                // and route that FlowFile to failure while routing those that 
finished processing to success and those
+                // that have not yet been executed to retry.
+                // Currently fragmented transaction does not use batch update.
+                final int[] updateCounts = ((BatchUpdateException) 
e).getUpdateCounts();
+                final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles();
+
+                // In the presence of a BatchUpdateException, the driver has 
the option of either stopping when an error
+                // occurs, or continuing. If it continues, then it must 
account for all statements in the batch and for
+                // those that fail return a Statement.EXECUTE_FAILED for the 
number of rows updated.
+                // So we will iterate over all of the update counts returned. 
If any is equal to Statement.EXECUTE_FAILED,
+                // we will route the corresponding FlowFile to failure. 
Otherwise, the FlowFile will go to success
+                // unless it has not yet been processed (its index in the List 
> updateCounts.length).
+                int failureCount = 0;
+                int successCount = 0;
+                int retryCount = 0;
+                for (int i = 0; i < updateCounts.length; i++) {
+                    final int updateCount = updateCounts[i];
+                    final FlowFile flowFile = batchFlowFiles.get(i);
+                    if (updateCount == Statement.EXECUTE_FAILED) {
+                        result.routeTo(flowFile, REL_FAILURE);
+                        failureCount++;
+                    } else {
+                        result.routeTo(flowFile, REL_SUCCESS);
+                        successCount++;
+                    }
+                }
+
+                if (failureCount == 0) {
+                    // if no failures found, the driver decided not to execute 
the statements after the
+                    // failure, so route the last one to failure.
+                    final FlowFile failedFlowFile = 
batchFlowFiles.get(updateCounts.length);
+                    result.routeTo(failedFlowFile, REL_FAILURE);
+                    failureCount++;
+                }
+
+                if (updateCounts.length < batchFlowFiles.size()) {
+                    final List<FlowFile> unexecuted = 
batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
+                    for (final FlowFile flowFile : unexecuted) {
+                        result.routeTo(flowFile, REL_RETRY);
+                        retryCount++;
+                    }
+                }
+
+                getLogger().error("Failed to update database due to a failed 
batch update. There were a total of {} FlowFiles that failed, {} that 
succeeded, "
+                        + "and {} that were not execute and will be routed to 
retry; ", new Object[]{failureCount, successCount, retryCount});
+
+                return;
 
-            for (final FlowFile flowFile : sentFlowFiles) {
-                destinationRelationships.put(flowFile, REL_SUCCESS);
             }
-        } catch (final SQLException e) {
-            // Failed FlowFiles are all of them that we have processed minus 
those that were successfully sent
-            final List<FlowFile> failedFlowFiles = processedFlowFiles;
-            failedFlowFiles.removeAll(sentFlowFiles);
 
-            // All FlowFiles yet to be processed is all FlowFiles minus those 
processed
-            final List<FlowFile> retry = flowFiles;
-            retry.removeAll(processedFlowFiles);
+            // Apply default error handling and logging for other Exceptions.
+            ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> 
onGroupError
+                    = ExceptionHandler.createOnGroupError(context, session, 
result, REL_FAILURE, REL_RETRY);
+            onGroupError = onGroupError.andThen((cl, il, rl, el) -> {
+                switch (r.destination()) {
+                    case Failure:
+                        getLogger().error("Failed to update database for {} 
due to {}; routing to failure", new Object[] {il.getFlowFiles(), e}, e);
+                        break;
+                    case Retry:
+                        getLogger().error("Failed to update database for {} 
due to {}; it is possible that retrying the operation will succeed, so routing 
to retry",
+                                new Object[] {il.getFlowFiles(), e}, e);
+                        break;
+                }
+            });
+            onGroupError.apply(c, enclosure, r, e);
+        });
+    }
+
+    @OnScheduled
+    public void constructProcess() {
+        process = new PutGroup<>();
 
-            final Relationship rel;
-            if (e instanceof SQLNonTransientException) {
-                getLogger().error("Failed to update database for {} due to {}; 
routing to failure", new Object[] {failedFlowFiles, e});
-                rel = REL_FAILURE;
-            } else {
-                getLogger().error("Failed to update database for {} due to {}; 
it is possible that retrying the operation will succeed, so routing to retry", 
new Object[] {failedFlowFiles, e});
-                rel = REL_RETRY;
+        process.setLogger(getLogger());
+        process.fetchFlowFiles(fetchFlowFiles);
+        process.initConnection(initConnection);
+        process.groupFetchedFlowFiles(groupFlowFiles);
+        process.putFlowFiles(putFlowFiles);
+        process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, 
REL_RETRY));
+
+        process.onCompleted((c, s, fc, conn) -> {
+            try {
+                conn.commit();
+            } catch (SQLException e) {
+                // Throw ProcessException to rollback process session.
+                throw new ProcessException("Failed to commit database 
connection due to " + e, e);
             }
+        });
 
-            for (final FlowFile flowFile : failedFlowFiles) {
-                destinationRelationships.put(flowFile, rel);
+        process.onFailed((c, s, fc, conn, e) -> {
+            try {
+                conn.rollback();
+            } catch (SQLException re) {
+                // Just log the fact that rollback failed.
+                // ProcessSession will be rollback by the thrown Exception so 
don't have to do anything here.
+                getLogger().warn("Failed to rollback database connection due 
to %s", new Object[]{re}, re);
             }
+        });
 
-            for (final FlowFile flowFile : retry) {
-                destinationRelationships.put(flowFile, Relationship.SELF);
+        process.cleanup((c, s, fc, conn) -> {
+            // make sure that we try to set the auto commit back to whatever 
it was.
+            if (fc.originalAutoCommit) {
+                try {
+                    conn.setAutoCommit(true);
+                } catch (final SQLException se) {
+                    getLogger().warn("Failed to reset autocommit due to {}", 
new Object[]{se});
+                }
             }
-        }
+        });
 
-        for (final Map.Entry<FlowFile, Relationship> entry : 
destinationRelationships.entrySet()) {
-            session.transfer(entry.getKey(), entry.getValue());
-        }
+        exceptionHandler = new ExceptionHandler<>();
+        exceptionHandler.mapException(e -> {
+            if (e instanceof SQLNonTransientException) {
+                return ErrorTypes.InvalidInput;
+            } else if (e instanceof SQLException) {
+                return ErrorTypes.TemporalFailure;
+            } else {
+                return ErrorTypes.UnknownFailure;
+            }
+        });
+        adjustError = RollbackOnFailure.createAdjustError(getLogger());
+        exceptionHandler.adjustError(adjustError);
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
+        final Boolean rollbackOnFailure = 
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+        final FunctionContext functionContext = new 
FunctionContext(rollbackOnFailure);
+        functionContext.obtainKeys = 
context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
+        RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, 
getLogger(), session -> process.onTrigger(context, session, functionContext));
+    }
 
     /**
      * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles 
are available, returns <code>null</code>.
@@ -448,7 +558,8 @@ public class PutSQL extends AbstractProcessor {
      * @param session the process session for pulling flowfiles
      * @return a FlowFilePoll containing a List of FlowFiles to process, or 
<code>null</code> if there are no FlowFiles to process
      */
-    private FlowFilePoll pollFlowFiles(final ProcessContext context, final 
ProcessSession session) {
+    private FlowFilePoll pollFlowFiles(final ProcessContext context, final 
ProcessSession session,
+                                       final FunctionContext functionContext, 
final RoutingResult result) {
         // Determine which FlowFile Filter to use in order to obtain FlowFiles.
         final boolean useTransactions = 
context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
         boolean fragmentedTransaction = false;
@@ -469,31 +580,23 @@ public class PutSQL extends AbstractProcessor {
 
         // If we are supporting fragmented transactions, verify that all 
FlowFiles are correct
         if (fragmentedTransaction) {
-            final Relationship relationship = determineRelationship(flowFiles, 
context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
-            if (relationship != null) {
-                // if transferring back to self, penalize the FlowFiles.
-                if (relationship == Relationship.SELF) {
-                    // penalize all of the FlowFiles that we are going to 
route to SELF.
-                    final ListIterator<FlowFile> itr = 
flowFiles.listIterator();
-                    while (itr.hasNext()) {
-                        final FlowFile flowFile = itr.next();
-                        final FlowFile penalized = session.penalize(flowFile);
-                        itr.remove();
-                        itr.add(penalized);
-                    }
+            try {
+                if (!isFragmentedTransactionReady(flowFiles, 
context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS))) {
+                    // Not ready, penalize FlowFiles and put it back to self.
+                    flowFiles.forEach(f -> result.routeTo(session.penalize(f), 
Relationship.SELF));
+                    return null;
                 }
 
-                session.transfer(flowFiles, relationship);
+            } catch (IllegalArgumentException e) {
+                // Map relationship based on context, and then let default 
handler to handle.
+                final ErrorTypes.Result adjustedRoute = 
adjustError.apply(functionContext, ErrorTypes.InvalidInput);
+                ExceptionHandler.createOnGroupError(context, session, result, 
REL_FAILURE, REL_RETRY)
+                        .apply(functionContext, () -> flowFiles, 
adjustedRoute, e);
                 return null;
             }
 
             // sort by fragment index.
-            Collections.sort(flowFiles, new Comparator<FlowFile>() {
-                @Override
-                public int compare(final FlowFile o1, final FlowFile o2) {
-                    return 
Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), 
Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR)));
-                }
-            });
+            flowFiles.sort(Comparator.comparing(o -> 
Integer.parseInt(o.getAttribute(FRAGMENT_INDEX_ATTR))));
         }
 
         return new FlowFilePoll(flowFiles, fragmentedTransaction);
@@ -521,63 +624,6 @@ public class PutSQL extends AbstractProcessor {
         return null;
     }
 
-
-    /**
-     * Returns the StatementFlowFileEnclosure that should be used for 
executing the given SQL statement
-     *
-     * @param sql the SQL to execute
-     * @param conn the connection from which a PreparedStatement can be created
-     * @param stmtMap the existing map of SQL to PreparedStatements
-     * @param obtainKeys whether or not we need to obtain generated keys for 
INSERT statements
-     * @param fragmentedTransaction whether or not the SQL pertains to a 
fragmented transaction
-     *
-     * @return a StatementFlowFileEnclosure to use for executing the given SQL 
statement
-     *
-     * @throws SQLException if unable to create the appropriate 
PreparedStatement
-     */
-    private StatementFlowFileEnclosure getEnclosure(final String sql, final 
Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
-                                                    final boolean obtainKeys, 
final boolean fragmentedTransaction) throws SQLException {
-        StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
-        if (enclosure != null) {
-            return enclosure;
-        }
-
-        if (obtainKeys) {
-            // Create a new Prepared Statement, requesting that it return the 
generated keys.
-            PreparedStatement stmt = conn.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS);
-
-            if (stmt == null) {
-                // since we are passing Statement.RETURN_GENERATED_KEYS, calls 
to conn.prepareStatement will
-                // in some cases (at least for DerbyDB) return null.
-                // We will attempt to recompile the statement without the 
generated keys being returned.
-                stmt = conn.prepareStatement(sql);
-            }
-
-            // If we need to obtain keys, then we cannot do a Batch Update. In 
this case,
-            // we don't need to store the PreparedStatement in the Map because 
we aren't
-            // doing an addBatch/executeBatch. Instead, we will use the 
statement once
-            // and close it.
-            return new StatementFlowFileEnclosure(stmt);
-        } else if (fragmentedTransaction) {
-            // We cannot use Batch Updates if we have a transaction that spans 
multiple FlowFiles.
-            // If we did, we could end up processing the statements out of 
order. It's quite possible
-            // that we could refactor the code some to allow for this, but as 
it is right now, this
-            // could cause problems. This is because we have a Map<String, 
StatementFlowFileEnclosure>.
-            // If we had a transaction that needed to execute Stmt A with some 
parameters, then Stmt B with
-            // some parameters, then Stmt A with different parameters, this 
would become problematic because
-            // the executeUpdate would be evaluated first for Stmt A (the 1st 
and 3rd statements, and then
-            // the second statement would be evaluated).
-            final PreparedStatement stmt = conn.prepareStatement(sql);
-            return new StatementFlowFileEnclosure(stmt);
-        }
-
-        final PreparedStatement stmt = conn.prepareStatement(sql);
-        enclosure = new StatementFlowFileEnclosure(stmt);
-        stmtMap.put(sql, enclosure);
-        return enclosure;
-    }
-
-
     /**
      * Determines the SQL statement that should be executed for the given 
FlowFile
      *
@@ -618,7 +664,7 @@ public class PutSQL extends AbstractProcessor {
 
                 final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
                 if (!isNumeric) {
-                    throw new ProcessException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
+                    throw new SQLDataException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
                 }
 
                 final int jdbcType = Integer.parseInt(entry.getValue());
@@ -630,11 +676,11 @@ public class PutSQL extends AbstractProcessor {
                 try {
                     setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType, parameterFormat);
                 } catch (final NumberFormatException nfe) {
-                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
+                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
                 } catch (ParseException pe) {
-                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to a 
timestamp", pe);
+                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to a 
timestamp", pe);
                 } catch (UnsupportedEncodingException uee) {
-                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to 
UTF-8", uee);
+                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to 
UTF-8", uee);
                 }
             }
         }
@@ -652,76 +698,69 @@ public class PutSQL extends AbstractProcessor {
      * @return the appropriate relationship to route the FlowFiles to, or 
<code>null</code> if the FlowFiles
      *         should instead be processed
      */
-    Relationship determineRelationship(final List<FlowFile> flowFiles, final 
Long transactionTimeoutMillis) {
+    boolean isFragmentedTransactionReady(final List<FlowFile> flowFiles, final 
Long transactionTimeoutMillis) throws IllegalArgumentException {
         int selectedNumFragments = 0;
         final BitSet bitSet = new BitSet();
 
+        BiFunction<String, Object[], IllegalArgumentException> illegal = (s, 
objects) -> new IllegalArgumentException(String.format(s, objects));
+
         for (final FlowFile flowFile : flowFiles) {
             final String fragmentCount = 
flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
             if (fragmentCount == null && flowFiles.size() == 1) {
-                return null;
+                return true;
             } else if (fragmentCount == null) {
-                getLogger().error("Cannot process {} because there are {} 
FlowFiles with the same fragment.identifier "
-                        + "attribute but not all FlowFiles have a 
fragment.count attribute; routing all to failure", new Object[] {flowFile, 
flowFiles.size()});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because there are %d 
FlowFiles with the same fragment.identifier "
+                        + "attribute but not all FlowFiles have a 
fragment.count attribute", new Object[] {flowFile, flowFiles.size()});
             }
 
             final int numFragments;
             try {
                 numFragments = Integer.parseInt(fragmentCount);
             } catch (final NumberFormatException nfe) {
-                getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not an integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because the 
fragment.count attribute has a value of '%s', which is not an integer",
+                        new Object[] {flowFile, fragmentCount});
             }
 
             if (numFragments < 1) {
-                getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not a positive integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because the 
fragment.count attribute has a value of '%s', which is not a positive integer",
+                        new Object[] {flowFile, fragmentCount});
             }
 
             if (selectedNumFragments == 0) {
                 selectedNumFragments = numFragments;
             } else if (numFragments != selectedNumFragments) {
-                getLogger().error("Cannot process {} because the 
fragment.count attribute has different values for different FlowFiles with the 
same fragment.identifier; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because the 
fragment.count attribute has different values for different FlowFiles with the 
same fragment.identifier",
+                        new Object[] {flowFile});
             }
 
             final String fragmentIndex = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
             if (fragmentIndex == null) {
-                getLogger().error("Cannot process {} because the 
fragment.index attribute is missing; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because the 
fragment.index attribute is missing", new Object[] {flowFile});
             }
 
             final int idx;
             try {
                 idx = Integer.parseInt(fragmentIndex);
             } catch (final NumberFormatException nfe) {
-                getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not an integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because the 
fragment.index attribute has a value of '%s', which is not an integer",
+                        new Object[] {flowFile, fragmentIndex});
             }
 
             if (idx < 0) {
-                getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not a positive integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because the 
fragment.index attribute has a value of '%s', which is not a positive integer",
+                        new Object[] {flowFile, fragmentIndex});
             }
 
             if (bitSet.get(idx)) {
-                getLogger().error("Cannot process {} because it has the same 
value for the fragment.index attribute as another FlowFile with the same 
fragment.identifier; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
-                return REL_FAILURE;
+                throw illegal.apply("Cannot process %s because it has the same 
value for the fragment.index attribute as another FlowFile with the same 
fragment.identifier",
+                        new Object[] {flowFile});
             }
 
             bitSet.set(idx);
         }
 
         if (selectedNumFragments == flowFiles.size()) {
-            return null; // no relationship to route FlowFiles to yet - 
process the FlowFiles.
+            return true; // no relationship to route FlowFiles to yet - 
process the FlowFiles.
         }
 
         long latestQueueTime = 0L;
@@ -733,13 +772,12 @@ public class PutSQL extends AbstractProcessor {
 
         if (transactionTimeoutMillis != null) {
             if (latestQueueTime > 0L && System.currentTimeMillis() - 
latestQueueTime > transactionTimeoutMillis) {
-                getLogger().error("The transaction timeout has expired for the 
following FlowFiles; they will be routed to failure: {}", new Object[] 
{flowFiles});
-                return REL_FAILURE;
+                throw illegal.apply("The transaction timeout has expired for 
the following FlowFiles; they will be routed to failure: %s", new Object[] 
{flowFiles});
             }
         }
 
         getLogger().debug("Not enough FlowFiles for transaction. Returning all 
FlowFiles to queue");
-        return Relationship.SELF; // not enough FlowFiles for this 
transaction. Return them all to queue.
+        return false;  // not enough FlowFiles for this transaction. Return 
them all to queue.
     }
 
     /**
@@ -924,7 +962,7 @@ public class PutSQL extends AbstractProcessor {
             if (selectedId.equals(fragmentId)) {
                 // fragment id's match. Find out if we have all of the 
necessary fragments or not.
                 final int numFragments;
-                if (NUMBER_PATTERN.matcher(fragCount).matches()) {
+                if (fragCount != null && 
NUMBER_PATTERN.matcher(fragCount).matches()) {
                     numFragments = Integer.parseInt(fragCount);
                 } else {
                     numFragments = Integer.MAX_VALUE;
@@ -971,22 +1009,69 @@ public class PutSQL extends AbstractProcessor {
     }
 
 
+    private static class FragmentedEnclosure extends 
StatementFlowFileEnclosure {
+
+        private final Map<FlowFile, StatementFlowFileEnclosure> 
flowFileToEnclosure = new HashMap<>();
+
+        public FragmentedEnclosure() {
+            super(null);
+        }
+
+        public void addFlowFile(final FlowFile flowFile, final 
StatementFlowFileEnclosure enclosure) {
+            addFlowFile(flowFile);
+            flowFileToEnclosure.put(flowFile, enclosure);
+        }
+
+        public StatementFlowFileEnclosure getTargetEnclosure(final FlowFile 
flowFile) {
+            return flowFileToEnclosure.get(flowFile);
+        }
+    }
+
     /**
      * A simple, immutable data structure to hold a Prepared Statement and a 
List of FlowFiles
      * for which that statement should be evaluated.
      */
-    private static class StatementFlowFileEnclosure {
-        private final PreparedStatement statement;
+    private static class StatementFlowFileEnclosure implements FlowFileGroup {
+        private final String sql;
+        private PreparedStatement statement;
         private final List<FlowFile> flowFiles = new ArrayList<>();
 
-        public StatementFlowFileEnclosure(final PreparedStatement statement) {
-            this.statement = statement;
+        public StatementFlowFileEnclosure(String sql) {
+            this.sql = sql;
         }
 
-        public PreparedStatement getStatement() {
+        public PreparedStatement getNewStatement(final Connection conn, final 
boolean obtainKeys) throws SQLException {
+            if (obtainKeys) {
+                // Create a new Prepared Statement, requesting that it return 
the generated keys.
+                PreparedStatement stmt = conn.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS);
+
+                if (stmt == null) {
+                    // since we are passing Statement.RETURN_GENERATED_KEYS, 
calls to conn.prepareStatement will
+                    // in some cases (at least for DerbyDB) return null.
+                    // We will attempt to recompile the statement without the 
generated keys being returned.
+                    stmt = conn.prepareStatement(sql);
+                }
+
+                // If we need to obtain keys, then we cannot do a Batch 
Update. In this case,
+                // we don't need to store the PreparedStatement in the Map 
because we aren't
+                // doing an addBatch/executeBatch. Instead, we will use the 
statement once
+                // and close it.
+                return stmt;
+            }
+
+            return conn.prepareStatement(sql);
+        }
+
+        public PreparedStatement getCachedStatement(final Connection conn) 
throws SQLException {
+            if (statement != null) {
+                return statement;
+            }
+
+            statement = conn.prepareStatement(sql);
             return statement;
         }
 
+        @Override
         public List<FlowFile> getFlowFiles() {
             return flowFiles;
         }
@@ -997,7 +1082,7 @@ public class PutSQL extends AbstractProcessor {
 
         @Override
         public int hashCode() {
-            return statement.hashCode();
+            return sql.hashCode();
         }
 
         @Override
@@ -1013,7 +1098,7 @@ public class PutSQL extends AbstractProcessor {
             }
 
             final StatementFlowFileEnclosure other = 
(StatementFlowFileEnclosure) obj;
-            return statement.equals(other.getStatement());
+            return sql.equals(other.sql);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index bb12fb4..e99f43a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard
 
 import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure
 import org.apache.nifi.processors.standard.util.record.MockRecordParser
 import org.apache.nifi.reporting.InitializationException
 import org.apache.nifi.serialization.record.RecordField
@@ -42,7 +43,6 @@ import java.sql.Statement
 
 import static org.junit.Assert.assertEquals
 import static org.junit.Assert.assertFalse
-import static org.junit.Assert.assertNull
 import static org.junit.Assert.assertTrue
 import static org.junit.Assert.fail
 import static org.mockito.Mockito.spy
@@ -53,7 +53,8 @@ import static org.mockito.Mockito.spy
 @RunWith(JUnit4.class)
 class TestPutDatabaseRecord {
 
-    private static final String createPersons = "CREATE TABLE PERSONS (id 
integer primary key, name varchar(100), code integer)"
+    private static final String createPersons = "CREATE TABLE PERSONS (id 
integer primary key, name varchar(100)," +
+            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 
1000))"
     private final static String DB_LOCATION = "target/db_pdr"
 
     TestRunner runner
@@ -131,33 +132,20 @@ class TestPutDatabaseRecord {
 
         ] as PutDatabaseRecord.TableSchema
 
+        runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
+        runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, 'false')
+        runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, 
'false')
+        runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false')
+        runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false')
+        def settings = new 
PutDatabaseRecord.DMLSettings(runner.getProcessContext())
+
         processor.with {
-            try {
-                assertNull(generateInsert(null, null, null,
-                        false, false, false, false,
-                        false, false).sql)
-                fail('Expecting ProcessException')
-            } catch (ProcessException ignore) {
-                // Expected
-            }
-            try {
-                assertNull(generateInsert(null, 'PERSONS', null,
-                        false, false, false, false,
-                        false, false).sql)
-                fail('Expecting ProcessException')
-            } catch (ProcessException ignore) {
-                // Expected
-            }
 
             assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
-                    generateInsert(schema, 'PERSONS', tableSchema,
-                            false, false, false, false,
-                            false, false).sql)
+                    generateInsert(schema, 'PERSONS', tableSchema, 
settings).sql)
 
             assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND 
code = ?',
-                    generateDelete(schema, 'PERSONS', tableSchema,
-                            false, false, false, false,
-                            false, false).sql)
+                    generateDelete(schema, 'PERSONS', tableSchema, 
settings).sql)
         }
     }
 
@@ -211,6 +199,81 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testInsertBatchUpdateException() throws InitializationException, 
ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 101)
+        parser.addRecord(2, 'rec2', 102)
+        parser.addRecord(3, 'rec3', 1000)
+        parser.addRecord(4, 'rec4', 104)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        // Transaction should be rolled back and table should remain empty.
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
+    void testInsertBatchUpdateExceptionRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 101)
+        parser.addRecord(2, 'rec2', 102)
+        parser.addRecord(3, 'rec3', 1000)
+        parser.addRecord(4, 'rec4', 104)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true')
+
+        runner.enqueue(new byte[0])
+        try {
+            runner.run()
+            fail("ProcessException is expected")
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException)
+        }
+
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        // Transaction should be rolled back and table should remain empty.
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
     void testInsertNoTable() throws InitializationException, ProcessException, 
SQLException, IOException {
         recreateTable("PERSONS", createPersons)
         final MockRecordParser parser = new MockRecordParser()
@@ -300,6 +363,37 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testSqlStatementTypeNoValueRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("sql", RecordFieldType.STRING)
+
+        parser.addRecord('')
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_ATTR_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true')
+
+        def attrs = [:]
+        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
+        runner.enqueue(new byte[0], attrs)
+        try {
+            runner.run()
+            fail("ProcessException is expected")
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException)
+        }
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0)
+    }
+
+    @Test
     void testUpdate() throws InitializationException, ProcessException, 
SQLException, IOException {
         recreateTable("PERSONS", createPersons)
         final MockRecordParser parser = new MockRecordParser()

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index 5e9ab58..2d0491b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,6 +40,7 @@ import org.apache.commons.lang3.RandomUtils;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -165,6 +167,29 @@ public class TestPutSQL {
         runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
     }
 
+    @Test
+    public void testFailInMiddleWithBadStatementRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 
84)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally 
wrong syntax
+        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 
3)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 
44)".getBytes());
+
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+            runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
+        }
+    }
+
 
     @Test
     public void testFailInMiddleWithBadParameterType() throws 
InitializationException, ProcessException, SQLException, IOException {
@@ -191,8 +216,41 @@ public class TestPutSQL {
 
         runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
         runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
+
     }
 
+    @Test
+    public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        goodAttributes.put("sql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
+        badAttributes.put("sql.args.1.value", "hello");
+
+        final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES 
('Mark', ?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+            runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
+        }
+    }
 
     @Test
     public void testFailInMiddleWithBadParameterValue() throws 
InitializationException, ProcessException, SQLException, IOException {
@@ -235,6 +293,48 @@ public class TestPutSQL {
         }
     }
 
+    @Test
+    public void testFailInMiddleWithBadParameterValueRollbackOnFailure() 
throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        recreateTable("PERSONS_AI",createPersonsAutoId);
+
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        goodAttributes.put("sql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        badAttributes.put("sql.args.1.value", "9999");
+
+        final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES 
('Mark', ?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+            runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+            runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
+        }
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS_AI");
+                assertFalse(rs.next());
+            }
+        }
+    }
+
 
     @Test
     public void testUsingSqlDataTypesWithNegativeValues() throws 
InitializationException, ProcessException, SQLException, IOException {
@@ -666,6 +766,8 @@ public class TestPutSQL {
         runner.enableControllerService(service);
         runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
 
+        recreateTable("PERSONS", createPersons);
+
         final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
                 "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
         final Map<String, String> attributes = new HashMap<>();
@@ -695,6 +797,47 @@ public class TestPutSQL {
         }
     }
 
+    @Test
+    public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        recreateTable("PERSONS", createPersons);
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertFalse(rs.next());
+            }
+        }
+    }
+
 
     @Test
     public void testWithNullParameter() throws InitializationException, 
ProcessException, SQLException, IOException {
@@ -766,6 +909,47 @@ public class TestPutSQL {
         }
     }
 
+    @Test
+    public void testInvalidStatementRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        recreateTable("PERSONS", createPersons);
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertFalse(rs.next());
+            }
+        }
+    }
+
 
     @Test
     public void testRetryableFailure() throws InitializationException, 
ProcessException, SQLException, IOException {
@@ -798,6 +982,42 @@ public class TestPutSQL {
         runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
     }
 
+    @Test
+    public void testRetryableFailureRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final DBCPService service = new SQLExceptionService(null);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+            // Should not be routed to retry.
+            runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 0);
+        }
+
+    }
 
     @Test
     public void testMultipleFlowFilesSuccessfulInTransaction() throws 
InitializationException, ProcessException, SQLException, IOException {
@@ -857,6 +1077,38 @@ public class TestPutSQL {
         }
     }
 
+    @Test
+    public void 
testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(PutSQL.BATCH_SIZE, "1");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+        recreateTable("PERSONS", createPersons);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("fragment.identifier", "1");
+        attributes.put("fragment.count", "2");
+        attributes.put("fragment.index", "0");
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?)".getBytes(), attributes);
+        // ProcessException should not be thrown in this case, because the 
input FlowFiles are simply differed.
+        runner.run();
+
+        // No FlowFiles should be transferred because there were not enough 
flowfiles with the same fragment identifier
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
+
+    }
 
     @Test
     public void testTransactionTimeout() throws InitializationException, 
ProcessException, SQLException, IOException {
@@ -895,6 +1147,81 @@ public class TestPutSQL {
         runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testTransactionTimeoutRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("fragment.identifier", "1");
+        attributes.put("fragment.count", "2");
+        attributes.put("fragment.index", "0");
+
+        final MockFlowFile mff = new MockFlowFile(0L) {
+            @Override
+            public Long getLastQueueDate() {
+                return System.currentTimeMillis() - 10000L; // return 10 
seconds ago
+            }
+
+            @Override
+            public Map<String, String> getAttributes() {
+                return attributes;
+            }
+
+            @Override
+            public String getAttribute(final String attrName) {
+                return attributes.get(attrName);
+            }
+        };
+
+        runner.enqueue(mff);
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testNullFragmentCountRollbackOnFailure() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+        final Map<String, String> attribute1 = new HashMap<>();
+        attribute1.put("fragment.identifier", "1");
+        attribute1.put("fragment.count", "2");
+        attribute1.put("fragment.index", "0");
+
+        final Map<String, String> attribute2 = new HashMap<>();
+        attribute2.put("fragment.identifier", "1");
+//        attribute2.put("fragment.count", null);
+        attribute2.put("fragment.index", "1");
+
+        runner.enqueue(new byte[]{}, attribute1);
+        runner.enqueue(new byte[]{}, attribute2);
+
+
+        try {
+            runner.run();
+            fail("ProcessException should be thrown");
+        } catch (AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+        }
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
+    }
+
     /**
      * Simple implementation only for testing purposes
      */
@@ -985,4 +1312,5 @@ public class TestPutSQL {
         byte[] bBinary = RandomUtils.nextBytes(length);
         return DatatypeConverter.printBase64Binary(bBinary);
     }
+
 }

Reply via email to