Repository: nifi Updated Branches: refs/heads/master a1bffbcc8 -> d9acdb54b
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 6d7c504..cb3b198 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -26,18 +26,28 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.FragmentAttributes; -import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.ErrorTypes; +import org.apache.nifi.processor.util.pattern.ExceptionHandler; +import org.apache.nifi.processor.util.pattern.PartialFunctions; +import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles; +import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup; +import org.apache.nifi.processor.util.pattern.PutGroup; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processor.util.pattern.RoutingResult; import org.apache.nifi.stream.io.StreamUtils; import javax.xml.bind.DatatypeConverter; @@ -52,6 +62,7 @@ import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLDataException; import java.sql.SQLException; import java.sql.SQLNonTransientException; import java.sql.Statement; @@ -65,19 +76,19 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; import java.util.ArrayList; import java.util.BitSet; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; + @SupportsBatching @SeeAlso(ConvertJSONToSQL.class) @InputRequirement(Requirement.INPUT_REQUIRED) @@ -109,7 +120,7 @@ import java.util.regex.Pattern; @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") }) -public class PutSQL extends AbstractProcessor { +public class PutSQL extends AbstractSessionFactoryProcessor { static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() .name("JDBC Connection Pool") @@ -180,6 +191,7 @@ public class PutSQL extends AbstractProcessor { properties.add(TRANSACTION_TIMEOUT); properties.add(BATCH_SIZE); properties.add(OBTAIN_GENERATED_KEYS); + properties.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); return properties; } @@ -192,199 +204,176 @@ public class PutSQL extends AbstractProcessor { return rels; } + private static class FunctionContext extends RollbackOnFailure { + private boolean obtainKeys = false; + private boolean fragmentedTransaction = false; + private boolean originalAutoCommit = false; + private final long startNanos = System.nanoTime(); + private FunctionContext(boolean rollbackOnFailure) { + super(rollbackOnFailure, true); + } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFilePoll poll = pollFlowFiles(context, session); + private boolean isSupportBatching() { + return !obtainKeys && !fragmentedTransaction; + } + } + + private PutGroup<FunctionContext, Connection, StatementFlowFileEnclosure> process; + private BiFunction<FunctionContext, ErrorTypes, ErrorTypes.Result> adjustError; + private ExceptionHandler<FunctionContext> exceptionHandler; + + + private final FetchFlowFiles<FunctionContext> fetchFlowFiles = (c, s, fc, r) -> { + final FlowFilePoll poll = pollFlowFiles(c, s, fc, r); if (poll == null) { - return; + return null; } + fc.fragmentedTransaction = poll.isFragmentedTransaction(); + return poll.getFlowFiles(); + }; - final List<FlowFile> flowFiles = poll.getFlowFiles(); - if (flowFiles == null) { - return; + private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> { + final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getConnection(); + try { + fc.originalAutoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + } catch (SQLException e) { + throw new ProcessException("Failed to disable auto commit due to " + e, e); } + return connection; + }; - final long startNanos = System.nanoTime(); - final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean(); - final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles - final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent - final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed - final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed - // Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when - // some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just - // map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there - // is a failure, we can route all FlowFiles to failure if we need to. - final Map<FlowFile, Relationship> destinationRelationships = new HashMap<>(); + @FunctionalInterface + private interface GroupingFunction { + void apply(final ProcessContext context, final ProcessSession session, final FunctionContext fc, + final Connection conn, final List<FlowFile> flowFiles, + final List<StatementFlowFileEnclosure> groups, + final Map<String, StatementFlowFileEnclosure> sqlToEnclosure, + final RoutingResult result); + } - final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); - try (final Connection conn = dbcpService.getConnection()) { - final boolean originalAutoCommit = conn.getAutoCommit(); - try { - conn.setAutoCommit(false); - - for (final FlowFile flowFile : flowFiles) { - processedFlowFiles.add(flowFile); - final String sql = getSQL(session, flowFile); - - // Get the appropriate PreparedStatement to use. - final StatementFlowFileEnclosure enclosure; - try { - enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction()); - } catch (final SQLNonTransientException e) { - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); - destinationRelationships.put(flowFile, REL_FAILURE); - continue; - } + private GroupingFunction groupFragmentedTransaction = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { + final FragmentedEnclosure fragmentedEnclosure = new FragmentedEnclosure(); + groups.add(fragmentedEnclosure); - final PreparedStatement stmt = enclosure.getStatement(); + for (final FlowFile flowFile : flowFiles) { + final String sql = getSQL(session, flowFile); - // set the appropriate parameters on the statement. - try { - setParameters(stmt, flowFile.getAttributes()); - } catch (final SQLException | ProcessException pe) { - getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); - destinationRelationships.put(flowFile, REL_FAILURE); - continue; - } + final StatementFlowFileEnclosure enclosure = sqlToEnclosure + .computeIfAbsent(sql, k -> new StatementFlowFileEnclosure(sql)); - // If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it. - if (obtainKeys) { - try { - // Execute the actual update. - stmt.executeUpdate(); - - // attempt to determine the key that was generated, if any. This is not supported by all - // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT), - // we will just move on without setting the attribute. - FlowFile sentFlowFile = flowFile; - final String generatedKey = determineGeneratedKey(stmt); - if (generatedKey != null) { - sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey); - } - - stmt.close(); - sentFlowFiles.add(sentFlowFile); - } catch (final SQLNonTransientException e) { - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); - destinationRelationships.put(flowFile, REL_FAILURE); - continue; - } catch (final SQLException e) { - getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e}); - destinationRelationships.put(flowFile, REL_RETRY); - continue; - } - } else { - // We don't need to obtain keys. Just add the statement to the batch. - stmt.addBatch(); - enclosure.addFlowFile(flowFile); - enclosuresToExecute.add(enclosure); - } - } + fragmentedEnclosure.addFlowFile(flowFile, enclosure); + } + }; - // If we are not trying to obtain the generated keys, we will have - // PreparedStatement's that have batches added to them. We need to execute each batch and close - // the PreparedStatement. - for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) { - try { - final PreparedStatement stmt = enclosure.getStatement(); - stmt.executeBatch(); - sentFlowFiles.addAll(enclosure.getFlowFiles()); - } catch (final BatchUpdateException e) { - // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure, - // and route that FlowFile to failure while routing those that finished processing to success and those - // that have not yet been executed to retry. If the FlowFile was - // part of a fragmented transaction, then we must roll back all updates for this connection, because - // other statements may have been successful and been part of this transaction. - final int[] updateCounts = e.getUpdateCounts(); - final int offendingFlowFileIndex = updateCounts.length; - final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles(); - - if (poll.isFragmentedTransaction()) { - // There are potentially multiple statements for this one transaction. As a result, - // we need to roll back the entire transaction and route all of the FlowFiles to failure. - conn.rollback(); - final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex); - getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. " - + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); - session.transfer(flowFiles, REL_FAILURE); - return; - } + private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { + for (final FlowFile flowFile : flowFiles) { + final String sql = getSQL(session, flowFile); + + // Get or create the appropriate PreparedStatement to use. + final StatementFlowFileEnclosure enclosure = sqlToEnclosure + .computeIfAbsent(sql, k -> { + final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql); + groups.add(newEnclosure); + return newEnclosure; + }); + + if(!exceptionHandler.execute(fc, flowFile, input -> { + final PreparedStatement stmt = enclosure.getCachedStatement(conn); + setParameters(stmt, flowFile.getAttributes()); + stmt.addBatch(); + }, onFlowFileError(context, session, result))) { + continue; + } - // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error - // occurs, or continuing. If it continues, then it must account for all statements in the batch and for - // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated. - // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED, - // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success - // unless it has not yet been processed (its index in the List > updateCounts.length). - int failureCount = 0; - int successCount = 0; - int retryCount = 0; - for (int i = 0; i < updateCounts.length; i++) { - final int updateCount = updateCounts[i]; - final FlowFile flowFile = batchFlowFiles.get(i); - if (updateCount == Statement.EXECUTE_FAILED) { - destinationRelationships.put(flowFile, REL_FAILURE); - failureCount++; - } else { - destinationRelationships.put(flowFile, REL_SUCCESS); - successCount++; - } - } + enclosure.addFlowFile(flowFile); + } + }; - if (failureCount == 0) { - // if no failures found, the driver decided not to execute the statements after the - // failure, so route the last one to failure. - final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length); - destinationRelationships.put(failedFlowFile, REL_FAILURE); - failureCount++; - } + private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { + for (final FlowFile flowFile : flowFiles) { + final String sql = getSQL(session, flowFile); - if (updateCounts.length < batchFlowFiles.size()) { - final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size()); - for (final FlowFile flowFile : unexecuted) { - destinationRelationships.put(flowFile, REL_RETRY); - retryCount++; - } - } + // Get or create the appropriate PreparedStatement to use. + final StatementFlowFileEnclosure enclosure = sqlToEnclosure + .computeIfAbsent(sql, k -> { + final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql); + groups.add(newEnclosure); + return newEnclosure; + }); - getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " - + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); - } catch (final SQLNonTransientException e) { - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e}); + enclosure.addFlowFile(flowFile); + } + }; + + final PutGroup.GroupFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> groupFlowFiles = (context, session, fc, conn, flowFiles, result) -> { + final Map<String, StatementFlowFileEnclosure> sqlToEnclosure = new HashMap<>(); + final List<StatementFlowFileEnclosure> groups = new ArrayList<>(); + + // There are three patterns: + // 1. Support batching: An enclosure has multiple FlowFiles being executed in a batch operation + // 2. Obtain keys: An enclosure has multiple FlowFiles, and each FlowFile is executed separately + // 3. Fragmented transaction: One FlowFile per Enclosure? + if (fc.obtainKeys) { + groupFlowFilesBySQL.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result); + } else if (fc.fragmentedTransaction) { + groupFragmentedTransaction.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result); + } else { + groupFlowFilesBySQLBatch.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result); + } - for (final FlowFile flowFile : enclosure.getFlowFiles()) { - destinationRelationships.put(flowFile, REL_FAILURE); - } - continue; - } catch (final SQLException e) { - getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", - new Object[] {enclosure.getFlowFiles(), e}); + return groups; + }; - for (final FlowFile flowFile : enclosure.getFlowFiles()) { - destinationRelationships.put(flowFile, REL_RETRY); - } - continue; - } finally { - enclosure.getStatement().close(); - } + final PutGroup.PutFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> putFlowFiles = (context, session, fc, conn, enclosure, result) -> { + + if (fc.isSupportBatching()) { + + // We have PreparedStatement that have batches added to them. + // We need to execute each batch and close the PreparedStatement. + exceptionHandler.execute(fc, enclosure, input -> { + try (final PreparedStatement stmt = enclosure.getCachedStatement(conn)) { + stmt.executeBatch(); + result.routeTo(enclosure.getFlowFiles(), REL_SUCCESS); } - } finally { - try { - conn.commit(); - } finally { - // make sure that we try to set the auto commit back to whatever it was. - if (originalAutoCommit) { - try { - conn.setAutoCommit(originalAutoCommit); - } catch (final SQLException se) { + }, onBatchUpdateError(context, session, result)); + + } else { + for (final FlowFile flowFile : enclosure.getFlowFiles()) { + + final StatementFlowFileEnclosure targetEnclosure + = enclosure instanceof FragmentedEnclosure + ? ((FragmentedEnclosure) enclosure).getTargetEnclosure(flowFile) + : enclosure; + + // Execute update one by one. + exceptionHandler.execute(fc, flowFile, input -> { + try (final PreparedStatement stmt = targetEnclosure.getNewStatement(conn, fc.obtainKeys)) { + + // set the appropriate parameters on the statement. + setParameters(stmt, flowFile.getAttributes()); + + stmt.executeUpdate(); + + // attempt to determine the key that was generated, if any. This is not supported by all + // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT), + // we will just move on without setting the attribute. + FlowFile sentFlowFile = flowFile; + final String generatedKey = determineGeneratedKey(stmt); + if (generatedKey != null) { + sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey); } + + result.routeTo(sentFlowFile, REL_SUCCESS); + } - } + }, onFlowFileError(context, session, result)); } + } + if (result.contains(REL_SUCCESS)) { // Determine the database URL String url = "jdbc://unknown-host"; try { @@ -393,46 +382,167 @@ public class PutSQL extends AbstractProcessor { } // Emit a Provenance SEND event - final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - for (final FlowFile flowFile : sentFlowFiles) { + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos); + for (final FlowFile flowFile : result.getRoutedFlowFiles().get(REL_SUCCESS)) { session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true); } + } + }; + + private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { + ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = createOnError(context, session, result, REL_FAILURE, REL_RETRY); + onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> { + switch (r.destination()) { + case Failure: + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {i, e}, e); + break; + case Retry: + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[] {i, e}, e); + break; + } + }); + return RollbackOnFailure.createOnError(onFlowFileError); + } + + private ExceptionHandler.OnError<FunctionContext, StatementFlowFileEnclosure> onBatchUpdateError( + final ProcessContext context, final ProcessSession session, final RoutingResult result) { + return RollbackOnFailure.createOnError((c, enclosure, r, e) -> { + + // If rollbackOnFailure is enabled, the error will be thrown as ProcessException instead. + if (e instanceof BatchUpdateException && !c.isRollbackOnFailure()) { + + // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure, + // and route that FlowFile to failure while routing those that finished processing to success and those + // that have not yet been executed to retry. + // Currently fragmented transaction does not use batch update. + final int[] updateCounts = ((BatchUpdateException) e).getUpdateCounts(); + final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles(); + + // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error + // occurs, or continuing. If it continues, then it must account for all statements in the batch and for + // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated. + // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED, + // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success + // unless it has not yet been processed (its index in the List > updateCounts.length). + int failureCount = 0; + int successCount = 0; + int retryCount = 0; + for (int i = 0; i < updateCounts.length; i++) { + final int updateCount = updateCounts[i]; + final FlowFile flowFile = batchFlowFiles.get(i); + if (updateCount == Statement.EXECUTE_FAILED) { + result.routeTo(flowFile, REL_FAILURE); + failureCount++; + } else { + result.routeTo(flowFile, REL_SUCCESS); + successCount++; + } + } + + if (failureCount == 0) { + // if no failures found, the driver decided not to execute the statements after the + // failure, so route the last one to failure. + final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length); + result.routeTo(failedFlowFile, REL_FAILURE); + failureCount++; + } + + if (updateCounts.length < batchFlowFiles.size()) { + final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size()); + for (final FlowFile flowFile : unexecuted) { + result.routeTo(flowFile, REL_RETRY); + retryCount++; + } + } + + getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " + + "and {} that were not execute and will be routed to retry; ", new Object[]{failureCount, successCount, retryCount}); + + return; - for (final FlowFile flowFile : sentFlowFiles) { - destinationRelationships.put(flowFile, REL_SUCCESS); } - } catch (final SQLException e) { - // Failed FlowFiles are all of them that we have processed minus those that were successfully sent - final List<FlowFile> failedFlowFiles = processedFlowFiles; - failedFlowFiles.removeAll(sentFlowFiles); - // All FlowFiles yet to be processed is all FlowFiles minus those processed - final List<FlowFile> retry = flowFiles; - retry.removeAll(processedFlowFiles); + // Apply default error handling and logging for other Exceptions. + ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError + = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); + onGroupError = onGroupError.andThen((cl, il, rl, el) -> { + switch (r.destination()) { + case Failure: + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {il.getFlowFiles(), e}, e); + break; + case Retry: + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[] {il.getFlowFiles(), e}, e); + break; + } + }); + onGroupError.apply(c, enclosure, r, e); + }); + } + + @OnScheduled + public void constructProcess() { + process = new PutGroup<>(); - final Relationship rel; - if (e instanceof SQLNonTransientException) { - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e}); - rel = REL_FAILURE; - } else { - getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e}); - rel = REL_RETRY; + process.setLogger(getLogger()); + process.fetchFlowFiles(fetchFlowFiles); + process.initConnection(initConnection); + process.groupFetchedFlowFiles(groupFlowFiles); + process.putFlowFiles(putFlowFiles); + process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY)); + + process.onCompleted((c, s, fc, conn) -> { + try { + conn.commit(); + } catch (SQLException e) { + // Throw ProcessException to rollback process session. + throw new ProcessException("Failed to commit database connection due to " + e, e); } + }); - for (final FlowFile flowFile : failedFlowFiles) { - destinationRelationships.put(flowFile, rel); + process.onFailed((c, s, fc, conn, e) -> { + try { + conn.rollback(); + } catch (SQLException re) { + // Just log the fact that rollback failed. + // ProcessSession will be rollback by the thrown Exception so don't have to do anything here. + getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, re); } + }); - for (final FlowFile flowFile : retry) { - destinationRelationships.put(flowFile, Relationship.SELF); + process.cleanup((c, s, fc, conn) -> { + // make sure that we try to set the auto commit back to whatever it was. + if (fc.originalAutoCommit) { + try { + conn.setAutoCommit(true); + } catch (final SQLException se) { + getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se}); + } } - } + }); - for (final Map.Entry<FlowFile, Relationship> entry : destinationRelationships.entrySet()) { - session.transfer(entry.getKey(), entry.getValue()); - } + exceptionHandler = new ExceptionHandler<>(); + exceptionHandler.mapException(e -> { + if (e instanceof SQLNonTransientException) { + return ErrorTypes.InvalidInput; + } else if (e instanceof SQLException) { + return ErrorTypes.TemporalFailure; + } else { + return ErrorTypes.UnknownFailure; + } + }); + adjustError = RollbackOnFailure.createAdjustError(getLogger()); + exceptionHandler.adjustError(adjustError); } + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean(); + final FunctionContext functionContext = new FunctionContext(rollbackOnFailure); + functionContext.obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean(); + RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext)); + } /** * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns <code>null</code>. @@ -448,7 +558,8 @@ public class PutSQL extends AbstractProcessor { * @param session the process session for pulling flowfiles * @return a FlowFilePoll containing a List of FlowFiles to process, or <code>null</code> if there are no FlowFiles to process */ - private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) { + private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session, + final FunctionContext functionContext, final RoutingResult result) { // Determine which FlowFile Filter to use in order to obtain FlowFiles. final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean(); boolean fragmentedTransaction = false; @@ -469,31 +580,23 @@ public class PutSQL extends AbstractProcessor { // If we are supporting fragmented transactions, verify that all FlowFiles are correct if (fragmentedTransaction) { - final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS)); - if (relationship != null) { - // if transferring back to self, penalize the FlowFiles. - if (relationship == Relationship.SELF) { - // penalize all of the FlowFiles that we are going to route to SELF. - final ListIterator<FlowFile> itr = flowFiles.listIterator(); - while (itr.hasNext()) { - final FlowFile flowFile = itr.next(); - final FlowFile penalized = session.penalize(flowFile); - itr.remove(); - itr.add(penalized); - } + try { + if (!isFragmentedTransactionReady(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS))) { + // Not ready, penalize FlowFiles and put it back to self. + flowFiles.forEach(f -> result.routeTo(session.penalize(f), Relationship.SELF)); + return null; } - session.transfer(flowFiles, relationship); + } catch (IllegalArgumentException e) { + // Map relationship based on context, and then let default handler to handle. + final ErrorTypes.Result adjustedRoute = adjustError.apply(functionContext, ErrorTypes.InvalidInput); + ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY) + .apply(functionContext, () -> flowFiles, adjustedRoute, e); return null; } // sort by fragment index. - Collections.sort(flowFiles, new Comparator<FlowFile>() { - @Override - public int compare(final FlowFile o1, final FlowFile o2) { - return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR))); - } - }); + flowFiles.sort(Comparator.comparing(o -> Integer.parseInt(o.getAttribute(FRAGMENT_INDEX_ATTR)))); } return new FlowFilePoll(flowFiles, fragmentedTransaction); @@ -521,63 +624,6 @@ public class PutSQL extends AbstractProcessor { return null; } - - /** - * Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement - * - * @param sql the SQL to execute - * @param conn the connection from which a PreparedStatement can be created - * @param stmtMap the existing map of SQL to PreparedStatements - * @param obtainKeys whether or not we need to obtain generated keys for INSERT statements - * @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction - * - * @return a StatementFlowFileEnclosure to use for executing the given SQL statement - * - * @throws SQLException if unable to create the appropriate PreparedStatement - */ - private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap, - final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { - StatementFlowFileEnclosure enclosure = stmtMap.get(sql); - if (enclosure != null) { - return enclosure; - } - - if (obtainKeys) { - // Create a new Prepared Statement, requesting that it return the generated keys. - PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); - - if (stmt == null) { - // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will - // in some cases (at least for DerbyDB) return null. - // We will attempt to recompile the statement without the generated keys being returned. - stmt = conn.prepareStatement(sql); - } - - // If we need to obtain keys, then we cannot do a Batch Update. In this case, - // we don't need to store the PreparedStatement in the Map because we aren't - // doing an addBatch/executeBatch. Instead, we will use the statement once - // and close it. - return new StatementFlowFileEnclosure(stmt); - } else if (fragmentedTransaction) { - // We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles. - // If we did, we could end up processing the statements out of order. It's quite possible - // that we could refactor the code some to allow for this, but as it is right now, this - // could cause problems. This is because we have a Map<String, StatementFlowFileEnclosure>. - // If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with - // some parameters, then Stmt A with different parameters, this would become problematic because - // the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then - // the second statement would be evaluated). - final PreparedStatement stmt = conn.prepareStatement(sql); - return new StatementFlowFileEnclosure(stmt); - } - - final PreparedStatement stmt = conn.prepareStatement(sql); - enclosure = new StatementFlowFileEnclosure(stmt); - stmtMap.put(sql, enclosure); - return enclosure; - } - - /** * Determines the SQL statement that should be executed for the given FlowFile * @@ -618,7 +664,7 @@ public class PutSQL extends AbstractProcessor { final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); if (!isNumeric) { - throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); + throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); } final int jdbcType = Integer.parseInt(entry.getValue()); @@ -630,11 +676,11 @@ public class PutSQL extends AbstractProcessor { try { setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat); } catch (final NumberFormatException nfe) { - throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); + throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); } catch (ParseException pe) { - throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); + throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); } catch (UnsupportedEncodingException uee) { - throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee); + throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee); } } } @@ -652,76 +698,69 @@ public class PutSQL extends AbstractProcessor { * @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles * should instead be processed */ - Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) { + boolean isFragmentedTransactionReady(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) throws IllegalArgumentException { int selectedNumFragments = 0; final BitSet bitSet = new BitSet(); + BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(String.format(s, objects)); + for (final FlowFile flowFile : flowFiles) { final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR); if (fragmentCount == null && flowFiles.size() == 1) { - return null; + return true; } else if (fragmentCount == null) { - getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier " - + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because there are %d FlowFiles with the same fragment.identifier " + + "attribute but not all FlowFiles have a fragment.count attribute", new Object[] {flowFile, flowFiles.size()}); } final int numFragments; try { numFragments = Integer.parseInt(fragmentCount); } catch (final NumberFormatException nfe) { - getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because the fragment.count attribute has a value of '%s', which is not an integer", + new Object[] {flowFile, fragmentCount}); } if (numFragments < 1) { - getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because the fragment.count attribute has a value of '%s', which is not a positive integer", + new Object[] {flowFile, fragmentCount}); } if (selectedNumFragments == 0) { selectedNumFragments = numFragments; } else if (numFragments != selectedNumFragments) { - getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier", + new Object[] {flowFile}); } final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR); if (fragmentIndex == null) { - getLogger().error("Cannot process {} because the fragment.index attribute is missing; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because the fragment.index attribute is missing", new Object[] {flowFile}); } final int idx; try { idx = Integer.parseInt(fragmentIndex); } catch (final NumberFormatException nfe) { - getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because the fragment.index attribute has a value of '%s', which is not an integer", + new Object[] {flowFile, fragmentIndex}); } if (idx < 0) { - getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because the fragment.index attribute has a value of '%s', which is not a positive integer", + new Object[] {flowFile, fragmentIndex}); } if (bitSet.get(idx)) { - getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; " - + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); - return REL_FAILURE; + throw illegal.apply("Cannot process %s because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier", + new Object[] {flowFile}); } bitSet.set(idx); } if (selectedNumFragments == flowFiles.size()) { - return null; // no relationship to route FlowFiles to yet - process the FlowFiles. + return true; // no relationship to route FlowFiles to yet - process the FlowFiles. } long latestQueueTime = 0L; @@ -733,13 +772,12 @@ public class PutSQL extends AbstractProcessor { if (transactionTimeoutMillis != null) { if (latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) { - getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[] {flowFiles}); - return REL_FAILURE; + throw illegal.apply("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: %s", new Object[] {flowFiles}); } } getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue"); - return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue. + return false; // not enough FlowFiles for this transaction. Return them all to queue. } /** @@ -924,7 +962,7 @@ public class PutSQL extends AbstractProcessor { if (selectedId.equals(fragmentId)) { // fragment id's match. Find out if we have all of the necessary fragments or not. final int numFragments; - if (NUMBER_PATTERN.matcher(fragCount).matches()) { + if (fragCount != null && NUMBER_PATTERN.matcher(fragCount).matches()) { numFragments = Integer.parseInt(fragCount); } else { numFragments = Integer.MAX_VALUE; @@ -971,22 +1009,69 @@ public class PutSQL extends AbstractProcessor { } + private static class FragmentedEnclosure extends StatementFlowFileEnclosure { + + private final Map<FlowFile, StatementFlowFileEnclosure> flowFileToEnclosure = new HashMap<>(); + + public FragmentedEnclosure() { + super(null); + } + + public void addFlowFile(final FlowFile flowFile, final StatementFlowFileEnclosure enclosure) { + addFlowFile(flowFile); + flowFileToEnclosure.put(flowFile, enclosure); + } + + public StatementFlowFileEnclosure getTargetEnclosure(final FlowFile flowFile) { + return flowFileToEnclosure.get(flowFile); + } + } + /** * A simple, immutable data structure to hold a Prepared Statement and a List of FlowFiles * for which that statement should be evaluated. */ - private static class StatementFlowFileEnclosure { - private final PreparedStatement statement; + private static class StatementFlowFileEnclosure implements FlowFileGroup { + private final String sql; + private PreparedStatement statement; private final List<FlowFile> flowFiles = new ArrayList<>(); - public StatementFlowFileEnclosure(final PreparedStatement statement) { - this.statement = statement; + public StatementFlowFileEnclosure(String sql) { + this.sql = sql; } - public PreparedStatement getStatement() { + public PreparedStatement getNewStatement(final Connection conn, final boolean obtainKeys) throws SQLException { + if (obtainKeys) { + // Create a new Prepared Statement, requesting that it return the generated keys. + PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + + if (stmt == null) { + // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will + // in some cases (at least for DerbyDB) return null. + // We will attempt to recompile the statement without the generated keys being returned. + stmt = conn.prepareStatement(sql); + } + + // If we need to obtain keys, then we cannot do a Batch Update. In this case, + // we don't need to store the PreparedStatement in the Map because we aren't + // doing an addBatch/executeBatch. Instead, we will use the statement once + // and close it. + return stmt; + } + + return conn.prepareStatement(sql); + } + + public PreparedStatement getCachedStatement(final Connection conn) throws SQLException { + if (statement != null) { + return statement; + } + + statement = conn.prepareStatement(sql); return statement; } + @Override public List<FlowFile> getFlowFiles() { return flowFiles; } @@ -997,7 +1082,7 @@ public class PutSQL extends AbstractProcessor { @Override public int hashCode() { - return statement.hashCode(); + return sql.hashCode(); } @Override @@ -1013,7 +1098,7 @@ public class PutSQL extends AbstractProcessor { } final StatementFlowFileEnclosure other = (StatementFlowFileEnclosure) obj; - return statement.equals(other.getStatement()); + return sql.equals(other.sql); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index bb12fb4..e99f43a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard import org.apache.nifi.processor.exception.ProcessException +import org.apache.nifi.processor.util.pattern.RollbackOnFailure import org.apache.nifi.processors.standard.util.record.MockRecordParser import org.apache.nifi.reporting.InitializationException import org.apache.nifi.serialization.record.RecordField @@ -42,7 +43,6 @@ import java.sql.Statement import static org.junit.Assert.assertEquals import static org.junit.Assert.assertFalse -import static org.junit.Assert.assertNull import static org.junit.Assert.assertTrue import static org.junit.Assert.fail import static org.mockito.Mockito.spy @@ -53,7 +53,8 @@ import static org.mockito.Mockito.spy @RunWith(JUnit4.class) class TestPutDatabaseRecord { - private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)" + private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," + + " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))" private final static String DB_LOCATION = "target/db_pdr" TestRunner runner @@ -131,33 +132,20 @@ class TestPutDatabaseRecord { ] as PutDatabaseRecord.TableSchema + runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false') + runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, 'false') + runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, 'false') + runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false') + runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false') + def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext()) + processor.with { - try { - assertNull(generateInsert(null, null, null, - false, false, false, false, - false, false).sql) - fail('Expecting ProcessException') - } catch (ProcessException ignore) { - // Expected - } - try { - assertNull(generateInsert(null, 'PERSONS', null, - false, false, false, false, - false, false).sql) - fail('Expecting ProcessException') - } catch (ProcessException ignore) { - // Expected - } assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)', - generateInsert(schema, 'PERSONS', tableSchema, - false, false, false, false, - false, false).sql) + generateInsert(schema, 'PERSONS', tableSchema, settings).sql) assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND code = ?', - generateDelete(schema, 'PERSONS', tableSchema, - false, false, false, false, - false, false).sql) + generateDelete(schema, 'PERSONS', tableSchema, settings).sql) } } @@ -211,6 +199,81 @@ class TestPutDatabaseRecord { } @Test + void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + parser.addRecord(1, 'rec1', 101) + parser.addRecord(2, 'rec2', 102) + parser.addRecord(3, 'rec3', 1000) + parser.addRecord(4, 'rec4', 104) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) + runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0) + runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 1) + final Connection conn = dbcp.getConnection() + final Statement stmt = conn.createStatement() + final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') + // Transaction should be rolled back and table should remain empty. + assertFalse(rs.next()) + + stmt.close() + conn.close() + } + + @Test + void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + parser.addRecord(1, 'rec1', 101) + parser.addRecord(2, 'rec2', 102) + parser.addRecord(3, 'rec3', 1000) + parser.addRecord(4, 'rec4', 104) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true') + + runner.enqueue(new byte[0]) + try { + runner.run() + fail("ProcessException is expected") + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException) + } + + final Connection conn = dbcp.getConnection() + final Statement stmt = conn.createStatement() + final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') + // Transaction should be rolled back and table should remain empty. + assertFalse(rs.next()) + + stmt.close() + conn.close() + } + + @Test void testInsertNoTable() throws InitializationException, ProcessException, SQLException, IOException { recreateTable("PERSONS", createPersons) final MockRecordParser parser = new MockRecordParser() @@ -300,6 +363,37 @@ class TestPutDatabaseRecord { } @Test + void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("sql", RecordFieldType.STRING) + + parser.addRecord('') + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql') + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true') + + def attrs = [:] + attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql' + runner.enqueue(new byte[0], attrs) + try { + runner.run() + fail("ProcessException is expected") + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException) + } + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) + runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0) + } + + @Test void testUpdate() throws InitializationException, ProcessException, SQLException, IOException { recreateTable("PERSONS", createPersons) final MockRecordParser parser = new MockRecordParser() http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 5e9ab58..2d0491b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -39,6 +40,7 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -165,6 +167,29 @@ public class TestPutSQL { runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); } + @Test + public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + runner.assertTransferCount(PutSQL.REL_FAILURE, 0); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); + } + } + @Test public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException { @@ -191,8 +216,41 @@ public class TestPutSQL { runner.assertTransferCount(PutSQL.REL_FAILURE, 1); runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + } + @Test + public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + badAttributes.put("sql.args.1.value", "hello"); + + final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + runner.assertTransferCount(PutSQL.REL_FAILURE, 0); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); + } + } @Test public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException { @@ -235,6 +293,48 @@ public class TestPutSQL { } } + @Test + public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + recreateTable("PERSONS_AI",createPersonsAutoId); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("sql.args.1.value", "9999"); + + final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + runner.assertTransferCount(PutSQL.REL_FAILURE, 0); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); + } + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI"); + assertFalse(rs.next()); + } + } + } + @Test public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException { @@ -666,6 +766,8 @@ public class TestPutSQL { runner.enableControllerService(service); runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + recreateTable("PERSONS", createPersons); + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; final Map<String, String> attributes = new HashMap<>(); @@ -695,6 +797,47 @@ public class TestPutSQL { } } + @Test + public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + recreateTable("PERSONS", createPersons); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + @Test public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException { @@ -766,6 +909,47 @@ public class TestPutSQL { } } + @Test + public void testInvalidStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + recreateTable("PERSONS", createPersons); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + @Test public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException { @@ -798,6 +982,42 @@ public class TestPutSQL { runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1); } + @Test + public void testRetryableFailureRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final DBCPService service = new SQLExceptionService(null); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + // Should not be routed to retry. + runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 0); + } + + } @Test public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException { @@ -857,6 +1077,38 @@ public class TestPutSQL { } } + @Test + public void testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(PutSQL.BATCH_SIZE, "1"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + recreateTable("PERSONS", createPersons); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + // ProcessException should not be thrown in this case, because the input FlowFiles are simply differed. + runner.run(); + + // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0); + + } @Test public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException { @@ -895,6 +1147,81 @@ public class TestPutSQL { runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); } + @Test + public void testTransactionTimeoutRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + + final MockFlowFile mff = new MockFlowFile(0L) { + @Override + public Long getLastQueueDate() { + return System.currentTimeMillis() - 10000L; // return 10 seconds ago + } + + @Override + public Map<String, String> getAttributes() { + return attributes; + } + + @Override + public String getAttribute(final String attrName) { + return attributes.get(attrName); + } + }; + + runner.enqueue(mff); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0); + } + + @Test + public void testNullFragmentCountRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + final Map<String, String> attribute1 = new HashMap<>(); + attribute1.put("fragment.identifier", "1"); + attribute1.put("fragment.count", "2"); + attribute1.put("fragment.index", "0"); + + final Map<String, String> attribute2 = new HashMap<>(); + attribute2.put("fragment.identifier", "1"); +// attribute2.put("fragment.count", null); + attribute2.put("fragment.index", "1"); + + runner.enqueue(new byte[]{}, attribute1); + runner.enqueue(new byte[]{}, attribute2); + + + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0); + } + /** * Simple implementation only for testing purposes */ @@ -985,4 +1312,5 @@ public class TestPutSQL { byte[] bBinary = RandomUtils.nextBytes(length); return DatatypeConverter.printBase64Binary(bBinary); } + }
