Repository: nifi Updated Branches: refs/heads/master d5bce9197 -> 63f55d05b
NIFI-5724 make database connection autocommit configurable making the database session autocommit value a configurable property adding custom validation to PutSQL processor so as to disallow 'supports transaction' and 'rollback on failure' to be true when the autocommit value has been set to true fixing some style issues to conform to standards This closes #3113. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/63f55d05 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/63f55d05 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/63f55d05 Branch: refs/heads/master Commit: 63f55d05b4f3f83b9e9f2206f4129ae7a4ade569 Parents: d5bce91 Author: Vish Uma <visw...@gmail.com> Authored: Fri Oct 26 15:32:46 2018 -0400 Committer: Koji Kawamura <ijokaruma...@apache.org> Committed: Fri Nov 9 17:07:27 2018 +0900 ---------------------------------------------------------------------- .../apache/nifi/processors/standard/PutSQL.java | 53 ++++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/63f55d05/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 9957c2e..38134c2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -28,6 +28,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -64,6 +66,7 @@ import java.sql.SQLNonTransientException; import java.sql.Statement; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -74,6 +77,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; +import static java.lang.String.format; import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; @SupportsBatching @@ -134,6 +138,14 @@ public class PutSQL extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder() + .name("database-session-autocommit") + .displayName("Database Session AutoCommit") + .description("The autocommit mode to set on the database connection being used.") + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() .name("Support Fragmented Transactions") .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " @@ -189,6 +201,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { properties.add(CONNECTION_POOL); properties.add(SQL_STATEMENT); properties.add(SUPPORT_TRANSACTIONS); + properties.add(AUTO_COMMIT); properties.add(TRANSACTION_TIMEOUT); properties.add(BATCH_SIZE); properties.add(OBTAIN_GENERATED_KEYS); @@ -197,6 +210,34 @@ public class PutSQL extends AbstractSessionFactoryProcessor { } @Override + protected final Collection<ValidationResult> customValidate(ValidationContext context) { + final Collection<ValidationResult> results = new ArrayList<>(); + final String support_transactions = context.getProperty(SUPPORT_TRANSACTIONS).getValue(); + final String rollback_on_failure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).getValue(); + final String auto_commit = context.getProperty(AUTO_COMMIT).getValue(); + + if(auto_commit.equalsIgnoreCase("true")) { + if(support_transactions.equalsIgnoreCase("true")) { + results.add(new ValidationResult.Builder() + .subject(SUPPORT_TRANSACTIONS.getDisplayName()) + .explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'." + + "Transactions for batch updates cannot be supported when auto commit is set to 'true'", + SUPPORT_TRANSACTIONS.getDisplayName(), AUTO_COMMIT.getDisplayName())) + .build()); + } + if(rollback_on_failure.equalsIgnoreCase("true")) { + results.add(new ValidationResult.Builder() + .subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName()) + .explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'." + + "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'", + RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName())) + .build()); + } + } + return results; + } + + @Override public Set<Relationship> getRelationships() { final Set<Relationship> rels = new HashSet<>(); rels.add(REL_SUCCESS); @@ -239,7 +280,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor { .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); try { fc.originalAutoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); + final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); + if(fc.originalAutoCommit != autocommit) { + connection.setAutoCommit(autocommit); + } } catch (SQLException e) { throw new ProcessException("Failed to disable auto commit due to " + e, e); } @@ -521,9 +565,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor { process.cleanup((c, s, fc, conn) -> { // make sure that we try to set the auto commit back to whatever it was. - if (fc.originalAutoCommit) { + final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); + if (fc.originalAutoCommit != autocommit) { try { - conn.setAutoCommit(true); + conn.setAutoCommit(fc.originalAutoCommit); } catch (final SQLException se) { getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se}); } @@ -670,7 +715,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { int selectedNumFragments = 0; final BitSet bitSet = new BitSet(); - BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(String.format(s, objects)); + BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(format(s, objects)); for (final FlowFile flowFile : flowFiles) { final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);