gresockj commented on a change in pull request #5042:
URL: https://github.com/apache/nifi/pull/5042#discussion_r625318172



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -340,7 +413,7 @@ public synchronized void commit() {
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    protected void commit(final Checkpoint checkpoint) {
+    protected void commit(final Checkpoint checkpoint, final boolean 
asynchronous) {

Review comment:
       If we're going to pass 'asynchronous' as a parameter that 's only 
available to be used in child classes, we should add a method comment here 
indicating the responsibility of the overriding method.  As it is, it's a bit 
confusing to have the boolean passed in but not used.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
##########
@@ -260,18 +260,10 @@ public InvocationResult invoke() {
         } finally {
             try {
                 if (batch) {
-                    try {
-                        rawSession.commit();
-                    } catch (final Throwable t) {
+                    rawSession.commitAsync(null, t -> {
                         final ComponentLog procLog = new 
SimpleProcessLogger(connectable.getIdentifier(), 
connectable.getRunnableComponent());
                         procLog.error("Failed to commit session {} due to {}; 
rolling back", new Object[] { rawSession, t.toString() }, t);
-
-                        try {
-                            rawSession.rollback(true);

Review comment:
       Since this previously wanted to penalize the record, would removing the 
rollback(true) call change the expected functionality?  Or did you deem that it 
shouldn't need to penalize during rollback here?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
##########
@@ -375,7 +375,7 @@ protected int commitTransferTransaction(Peer peer, 
FlowFileTransaction transacti
             throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
         }
 
-        session.commit();
+        session.commitAsync();

Review comment:
       It looks like there's another commit() call below here that was perhaps 
missed (line 532).

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
##########
@@ -110,6 +118,7 @@
             .failurePortNames(failurePortNames)
             .parameterContexts(parameterContextDefinitions)
             .reportingTasks(reportingTaskDefinitions)
+            .transactionThresholds( transactionThresholds)

Review comment:
       Extra space

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
##########
@@ -207,6 +221,73 @@
         return new ArrayList<>(contextDefinitions.values());
     }
 
+    private TransactionThresholds getTransactionThresholds(final Map<String, 
String> properties) {
+        final Long flowfileThreshold = getLongProperty(properties, 
TRANSACTION_THRESHOLD_FLOWFILES);
+        final Double dataSizeThreshold = getDataSizeProperty(properties, 
TRANSACTION_THRESHOLD_DATA_SIZE, DataUnit.B);
+        final Double timeThreshold = getTimePeriodProperty(properties, 
TRANSACTION_THRESHOLD_TIME, TimeUnit.NANOSECONDS);
+
+        final OptionalLong maxFlowFiles = flowfileThreshold == null ? 
OptionalLong.empty() : OptionalLong.of(flowfileThreshold);
+        final OptionalLong maxBytes = dataSizeThreshold == null ? 
OptionalLong.empty() : OptionalLong.of(dataSizeThreshold.longValue());
+        final OptionalLong maxNanos = timeThreshold == null ? 
OptionalLong.empty() : OptionalLong.of(timeThreshold.longValue());
+
+        return new TransactionThresholds() {
+            @Override
+            public OptionalLong getMaxFlowFiles() {
+                return maxFlowFiles;
+            }
+
+            @Override
+            public OptionalLong getMaxContentSize(final DataUnit dataUnit) {
+                return maxBytes;
+            }
+
+            @Override
+            public OptionalLong getMaxTime(final TimeUnit timeUnit) {
+                return maxNanos;
+            }
+        };
+    }
+

Review comment:
       These property methods seem useful.. can we pull them into something 
like a PropertiesUtils in nifi-utils?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
##########
@@ -207,6 +221,73 @@
         return new ArrayList<>(contextDefinitions.values());
     }
 
+    private TransactionThresholds getTransactionThresholds(final Map<String, 
String> properties) {
+        final Long flowfileThreshold = getLongProperty(properties, 
TRANSACTION_THRESHOLD_FLOWFILES);
+        final Double dataSizeThreshold = getDataSizeProperty(properties, 
TRANSACTION_THRESHOLD_DATA_SIZE, DataUnit.B);
+        final Double timeThreshold = getTimePeriodProperty(properties, 
TRANSACTION_THRESHOLD_TIME, TimeUnit.NANOSECONDS);
+
+        final OptionalLong maxFlowFiles = flowfileThreshold == null ? 
OptionalLong.empty() : OptionalLong.of(flowfileThreshold);
+        final OptionalLong maxBytes = dataSizeThreshold == null ? 
OptionalLong.empty() : OptionalLong.of(dataSizeThreshold.longValue());
+        final OptionalLong maxNanos = timeThreshold == null ? 
OptionalLong.empty() : OptionalLong.of(timeThreshold.longValue());
+
+        return new TransactionThresholds() {
+            @Override
+            public OptionalLong getMaxFlowFiles() {
+                return maxFlowFiles;
+            }
+
+            @Override
+            public OptionalLong getMaxContentSize(final DataUnit dataUnit) {
+                return maxBytes;
+            }
+
+            @Override
+            public OptionalLong getMaxTime(final TimeUnit timeUnit) {
+                return maxNanos;
+            }
+        };
+    }
+
+    private Long getLongProperty(final Map<String, String> properties, final 
String propertyName) {
+        final String propertyValue = properties.get(propertyName);
+        if (propertyValue == null || propertyValue.trim().isEmpty()) {
+            return null;
+        }
+
+        try {
+            return Long.parseLong(propertyValue.trim());
+        } catch (final NumberFormatException nfe) {
+            throw new IllegalArgumentException("Configured property <" + 
propertyName + "> has a value that is not a valid 64-bit integer");
+        }
+    }

Review comment:
       Some potential for code reuse here:
   ```suggestion
       private Optional<String> getTrimmedProperty(final Map<String, String> 
properties, final String propertyName) {
           final String propertyValue = properties.get(propertyName);
           return (propertyValue == null || propertyValue.trim().isEmpty()) ? 
Optional.empty() : Optional.of(propertyValue.trim());
       }
   
       private Long getLongProperty(final Map<String, String> properties, final 
String propertyName) {
           final Optional<String> propertyValue = 
getTrimmedProperty(properties, propertyName);
           try {
               return propertyValue.isPresent() ? 
Long.parseLong(propertyValue.get()) : null;
           } catch (final NumberFormatException nfe) {
               throw new IllegalArgumentException("Configured property <" + 
propertyName + "> has a value that is not a valid 64-bit integer");
           }
       }
   ```

##########
File path: nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
##########
@@ -101,6 +102,105 @@
      */
     void commit();
 
+    /**
+     * <p>

Review comment:
       I've gone back and forth on this several times, but I'm leaning toward 
renaming `commit()` to `commitSynchronous()`, and let the others just be 
`commit()`.  I think `commitAsync()` sounds too much like the commit is 
definitely asynchronous, whereas it's really just the potential to be 
asynchronous (in StandardProcessSession, it's not asynchronous unless you use 
Runnable).

##########
File path: nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
##########
@@ -101,6 +102,105 @@
      */
     void commit();
 
+    /**
+     * <p>
+     * Commits the current session ensuring all operations against FlowFiles
+     * within this session are atomically persisted. All FlowFiles operated on
+     * within this session must be accounted for by transfer or removal or the
+     * commit will fail.
+     * </p>
+     *
+     * <p>
+     * Unlike the {@link #commit()} method, the persistence of data to the 
repositories is not
+     * guaranteed to have occurred by the time that this method returns. 
Therefore, if any follow-on actions
+     * are necessary after the data has been persisted to the repository (for 
example, acknowledging receipt from
+     * a source system, removing a source file, etc.) that logic should be 
performed only by invoking {@link #commitAsync(Runnable, Consumer)}
+     * and implementing that action in the provided callback.
+     * </p>
+     *
+     * <p>
+     * If the session cannot be committed, an error will be logged and the 
session will be rolled back instead.
+     * </p>
+     *
+     * @throws IllegalStateException if called from within a read or write 
callback (See {@link #write(FlowFile, StreamCallback)}, {@link #write(FlowFile, 
OutputStreamCallback)},
+     * {@link #read(FlowFile, InputStreamCallback)}).
+     *
+     * @throws FlowFileHandlingException if any FlowFile is not appropriately 
accounted for by transferring it to a Relationship (see {@link 
#transfer(FlowFile, Relationship)})
+     * or removed (see {@link #remove(FlowFile)}.
+     */
+    void commitAsync();
+
+    /**
+     * <p>
+     * Commits the current session ensuring all operations against FlowFiles
+     * within this session are atomically persisted. All FlowFiles operated on
+     * within this session must be accounted for by transfer or removal or the
+     * commit will fail.
+     * </p>
+     *
+     * <p>
+     * Unlike the {@link #commit()} method, the persistence of data to the 
repositories is not

Review comment:
       Just to be clear, you're saying the session should not be reused once 
`commitAsync(Runnable) `or `commitAsync(Runnable, Consumer) `is called, but it 
could be reused if just `commitAsync()` is called?  I think this makes sense 
because there are no follow-on tasks in the latter case. 
   
   In this case, I think we should make this warning like a red flashing light 
at the top of the comment block so developers immediately understand the 
implications in the differences between the methods.  It almost seems to 
warrant naming the methods differently, like `commitIntermediate()` vs. 
`commitTerminal(Runnable)`/`commitTerminal(Runnable, Consumer)`.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
##########
@@ -193,32 +193,33 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                     stopWatch.stop();
                     final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
                     final String dataRate = 
stopWatch.calculateDataRate(flowFile.getSize());
-                    flowFile = session.putAttribute(flowFile, 
this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname);
-                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.PATH.key(), parentRelativePathString);
-                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), relativeFile.getName());
-                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
-                    Map<String, String> attributes = 
getAttributesFromFile(file);
-                    if (attributes.size() > 0) {
-                        flowFile = session.putAllAttributes(flowFile, 
attributes);
-                    }
 
-                    if (deleteOriginal) {
-                        try {
-                            transfer.deleteFile(flowFile, null, 
file.getFullPathFileName());
-                        } catch (final IOException e) {
-                            logger.error("Failed to remove remote file {} due 
to {}; deleting local copy",
-                                    new Object[]{file.getFullPathFileName(), 
e});
-                            session.remove(flowFile);
-                            return;
-                        }
-                    }
+                    final Map<String, String> attributes = 
getAttributesFromFile(file);
+                    
attributes.put(this.getClass().getSimpleName().toLowerCase() + 
".remote.source", hostname);
+                    attributes.put(CoreAttributes.PATH.key(), 
parentRelativePathString);
+                    attributes.put(CoreAttributes.FILENAME.key(), 
relativeFile.getName());
+                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
absPathString);
+
+                    flowFile = session.putAllAttributes(flowFile, attributes);
 
                     session.getProvenanceReporter().receive(flowFile, 
transfer.getProtocolName() + "://" + hostname + "/" + 
file.getFullPathFileName(), millis);
                     session.transfer(flowFile, REL_SUCCESS);
                     logger.info("Successfully retrieved {} from {} in {} 
milliseconds at a rate of {} and transferred to success",
-                            new Object[]{flowFile, hostname, millis, 
dataRate});
-
-                    session.commit();
+                        new Object[]{flowFile, hostname, millis, dataRate});
+
+                    final FlowFile receivedFlowFile = flowFile;
+                    final FileTransfer fileTransfer = transfer;
+                    session.commitAsync(() -> {
+                        if (deleteOriginal) {
+                            try {
+                                fileTransfer.deleteFile(receivedFlowFile, 
null, file.getFullPathFileName());

Review comment:
       I think fileTransfer could be closed in the finally block before it's 
used here.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -448,17 +449,18 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         }
 
         final Set<String> updatedKeys = new HashSet<>();
-        if (latestListedTimestampInThisCycle <= currentTimestamp) {
-            updatedKeys.addAll(currentKeys);
+        if (latestListedTimestampInThisCycle <= currentTimestamp.get()) {
+            updatedKeys.addAll(currentKeys.get());
         }
         updatedKeys.addAll(listedKeys);
 
         persistState(session, latestListedTimestampInThisCycle, updatedKeys);
-        session.commit();
 
-        // Update currentKeys.
-        currentKeys = updatedKeys;
-        currentTimestamp = latestListedTimestampInThisCycle;
+        final long latestListed = latestListedTimestampInThisCycle;
+        session.commitAsync(() -> {
+            this.currentKeys.set(updatedKeys);

Review comment:
       Could this lead to a race condition with using currentKeys above in a 
subsequent call to ListS3?

##########
File path: 
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/IngestFile.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+
+public class IngestFile extends AbstractProcessor {
+    static final PropertyDescriptor FILENAME = new Builder()
+        .name("Filename")
+        .displayName("Filename")
+        .description("Fully qualified path to the file that should be 
ingested")
+        .required(true)
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .build();
+    static final PropertyDescriptor COMMIT_MODE = new Builder()
+        .name("Commit Mode")
+        .displayName("Commit Mode")
+        .description("How to commit the process session")
+        .allowableValues("synchronous", "asynchronous")
+        .defaultValue("asynchronous")
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(FILENAME, COMMIT_MODE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String filename = context.getProperty(FILENAME).getValue();
+        final File file = new File(filename);
+
+        FlowFile flowFile = session.create();
+        flowFile = session.importFrom(file.toPath(), true, flowFile);
+        session.transfer(flowFile, REL_SUCCESS);
+        session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString());
+
+        final String commitMode = context.getProperty(COMMIT_MODE).getValue();
+        if ("synchronous".equalsIgnoreCase(commitMode)) {

Review comment:
       Constant?

##########
File path: 
nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
##########
@@ -340,16 +339,10 @@ private int processQueue(ProcessSession session) {
             flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), APPLICATION_XML);
             session.getProvenanceReporter().receive(flowFile, provenanceUri);
             session.transfer(flowFile, REL_SUCCESS);
-            session.commit();
             flowFileCount++;
-            if (!renderedXMLs.remove(xml) && getLogger().isWarnEnabled()) {
-                getLogger().warn(new StringBuilder("Event ")
-                        .append(xml)
-                        .append(" had already been removed from queue, 
FlowFile ")
-                        
.append(flowFile.getAttribute(CoreAttributes.UUID.key()))
-                        .append(" possible duplication of data")
-                        .toString());
-            }
+
+            final String xmlMessage = xml;
+            session.commitAsync(() -> renderedXMLs.remove(xmlMessage));

Review comment:
       Could this be a race condition?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
##########
@@ -412,41 +419,130 @@ public TriggerResult getResult() throws 
InterruptedException {
             }
         };
 
-        final Future<?> future = runDataflowExecutor.submit(() -> 
executeDataflow(resultQueue, executionProgress));
+        final Future<?> future = runDataflowExecutor.submit(() -> 
executeDataflow(resultQueue, executionProgress, tracker));
         processFuture.set(future);
 
         return trigger;
     }
 
 
-    private void executeDataflow(final BlockingQueue<TriggerResult> 
resultQueue, final ExecutionProgress executionProgress) {
-        try {
-            for (final Connectable connectable : rootConnectables) {
-                final ProcessContext processContext = 
processContextFactory.createProcessContext(connectable);
-
-                final StatelessProcessSessionFactory sessionFactory = new 
StatelessProcessSessionFactory(connectable, repositoryContextFactory,
-                    processContextFactory, executionProgress);
+    private void executeDataflow(final BlockingQueue<TriggerResult> 
resultQueue, final ExecutionProgress executionProgress, final 
AsynchronousCommitTracker tracker) {
+        final long startNanos = System.nanoTime();
+        transactionThresholdMeter.reset();
 
-                final long start = System.nanoTime();
-                final long processingNanos;
+        try {
+            Connectable currentComponent = null;
 
-                // If there is no incoming connection, trigger once.
-                logger.debug("Triggering {}", connectable);
-                connectable.onTrigger(processContext, sessionFactory);
+            try {
+                boolean completionReached = false;
+                while (!completionReached) {

Review comment:
       Nice!  While I'm impressed that you were able to come up with 
quintuply-nested loop, I can't help but wonder if this could be refactored a 
bit to at least put some of the loops in separate methods.
   
   Also, could this become:
   ```suggestion
                   for (boolean completionReached = false; !completionReached; 
completionReached = !tracker.isAnyReady()) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/ProcessSessionWrap.java
##########
@@ -96,7 +97,7 @@ public ProcessSessionWrap(ProcessSession s, boolean 
toFailureOnError) {
      */
     public abstract SessionFile wrap(FlowFile f);
 
-    public List<FlowFile> wrap(List ff) {
+    public List<FlowFile> wrap(List<FlowFile> ff) {

Review comment:
       Could make the arguments final.

##########
File path: 
nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/ProcessSessionWrap.java
##########
@@ -70,15 +71,15 @@ on get(),create(),write(),... we will store here last 
version of file by id
     */
     private Map<String, FlowFile> toDrop = new HashMap<>();
 
-    public ProcessSessionWrap(ProcessSession s, boolean toFailureOnError) {
-        if (s instanceof ProcessSessionWrap) {
+    public ProcessSessionWrap(ProcessSession session, boolean 
toFailureOnError) {

Review comment:
       Could make the arguments final.

##########
File path: 
nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/ProcessSessionWrap.java
##########
@@ -116,11 +117,11 @@ public FlowFile unwrap(FlowFile f) {
         return f;
     }
 
-    public List<FlowFile> unwrap(Collection<FlowFile> _ff) {
-        if (_ff == null) {
+    public List<FlowFile> unwrap(Collection<FlowFile> flowFiles) {

Review comment:
       Could make the arguments final.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
##########
@@ -245,6 +245,23 @@ public int completeFullEnoughBins() throws IOException {
         return handleCompletedBins(RecordBin::isFullEnough, "Bin is full 
enough");
     }
 
+    public boolean isAnyBinFullEnough() {

Review comment:
       Do you plan to use this somewhere?  I didn't detect any uses.

##########
File path: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
##########
@@ -370,18 +371,17 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 rowsPulledHolder.set(++rowsPulled);
 
                 if (++rowsPulled % getBatchSize() == 0) {
-                    session.commit();
+                    session.commitAsync();
                 }
             });
 
             final ScanResult scanResults = new 
ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp);
 
-            if (lastResult == null || scanResults.getTimestamp() > 
lastResult.getTimestamp()) {
+            final ScanResult latestResult = lastResult.get();
+            if (latestResult == null || scanResults.getTimestamp() > 
latestResult.getTimestamp()) {
                 session.setState(scanResults.toFlatMap(), Scope.CLUSTER);
-                session.commit();
-
-                lastResult = scanResults;
-            } else if (scanResults.getTimestamp() == 
lastResult.getTimestamp()) {
+                session.commitAsync(() -> lastResult.set(scanResults));

Review comment:
       Could this introduce a race condition?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
##########
@@ -306,37 +306,8 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             // it is critical that we commit the session before 
moving/deleting the remote file. Otherwise, we could have a situation where
             // we ingest the data, delete/move the remote file, and then NiFi 
dies/is shut down before the session is committed. This would
             // result in data loss! If we commit the session first, we are 
safe.
-            session.commit();
-
-            final String completionStrategy = 
context.getProperty(COMPLETION_STRATEGY).getValue();
-            if 
(COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
-                try {
-                    transfer.deleteFile(flowFile, null, filename);
-                } catch (final FileNotFoundException e) {
-                    // file doesn't exist -- effectively the same as removing 
it. Move on.
-                } catch (final IOException ioe) {
-                    getLogger().warn("Successfully fetched the content for {} 
from {}:{}{} but failed to remove the remote file due to {}",
-                            new Object[]{flowFile, host, port, filename, ioe}, 
ioe);
-                }
-            } else if 
(COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
-                final String targetDir = 
context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
-                final String simpleFilename = 
StringUtils.substringAfterLast(filename, "/");
-
-                try {
-                    final String absoluteTargetDirPath = 
transfer.getAbsolutePath(flowFile, targetDir);
-                    final File targetFile = new File(absoluteTargetDirPath, 
simpleFilename);
-                    if 
(context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
-                        // Create the target directory if necessary.
-                        transfer.ensureDirectoryExists(flowFile, 
targetFile.getParentFile());
-                    }
-
-                    transfer.rename(flowFile, filename, 
targetFile.getAbsolutePath());
-
-                } catch (final IOException ioe) {
-                    getLogger().warn("Successfully fetched the content for {} 
from {}:{}{} but failed to rename the remote file due to {}",
-                            new Object[]{flowFile, host, port, filename, ioe}, 
ioe);
-                }
-            }
+            final FlowFile flowFileReceived = flowFile;
+            session.commitAsync(() -> performCompletionStrategy(transfer, 
context, flowFileReceived, filename, host, port));

Review comment:
       It looks like transfer is closed in the finally block on line 316.  
Couldn't the thread try to use transfer after it's closed?

##########
File path: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
##########
@@ -574,9 +572,10 @@ public void handle(final byte[] rowKey, final ResultCell[] 
resultCells) {
                 // we could potentially have a huge number of rows. If we get 
to batchSize, go ahead and commit the
                 // session so that we can avoid buffering tons of FlowFiles 
without ever sending any out.
                 if (getBatchSize()>0 && ffUncommittedCount*bulkSize > 
getBatchSize()) {
-                    session.commit();
-                    ffCountHolder.set(0L);
-                }else{
+                    session.commitAsync(() -> {
+                        ffCountHolder.set(0L);

Review comment:
       This handler is referenced on line 392, and the session is used below in 
lines 407-425, meaning the call to session.commitAsync(Runnable) would be 
called before the session is reused.  Would this be a problem?

##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
##########
@@ -159,8 +158,8 @@ protected void processResource(final Connection connection, 
final AMQPConsumer c
         }
 
         if (lastReceived != null) {
-            session.commit();
-            consumer.acknowledge(lastReceived);
+            final GetResponse finalGetResponse = lastReceived;
+            session.commitAsync(() -> consumer.acknowledge(finalGetResponse), 
null);

Review comment:
       Any reason you didn't just use commitAsync(Runnable) here?

##########
File path: nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
##########
@@ -101,6 +102,105 @@
      */
     void commit();
 
+    /**
+     * <p>
+     * Commits the current session ensuring all operations against FlowFiles
+     * within this session are atomically persisted. All FlowFiles operated on
+     * within this session must be accounted for by transfer or removal or the
+     * commit will fail.
+     * </p>
+     *
+     * <p>
+     * Unlike the {@link #commit()} method, the persistence of data to the 
repositories is not
+     * guaranteed to have occurred by the time that this method returns. 
Therefore, if any follow-on actions
+     * are necessary after the data has been persisted to the repository (for 
example, acknowledging receipt from
+     * a source system, removing a source file, etc.) that logic should be 
performed only by invoking {@link #commitAsync(Runnable, Consumer)}
+     * and implementing that action in the provided callback.
+     * </p>
+     *
+     * <p>
+     * If the session cannot be committed, an error will be logged and the 
session will be rolled back instead.
+     * </p>
+     *
+     * @throws IllegalStateException if called from within a read or write 
callback (See {@link #write(FlowFile, StreamCallback)}, {@link #write(FlowFile, 
OutputStreamCallback)},
+     * {@link #read(FlowFile, InputStreamCallback)}).
+     *
+     * @throws FlowFileHandlingException if any FlowFile is not appropriately 
accounted for by transferring it to a Relationship (see {@link 
#transfer(FlowFile, Relationship)})
+     * or removed (see {@link #remove(FlowFile)}.
+     */
+    void commitAsync();
+
+    /**
+     * <p>
+     * Commits the current session ensuring all operations against FlowFiles
+     * within this session are atomically persisted. All FlowFiles operated on
+     * within this session must be accounted for by transfer or removal or the
+     * commit will fail.
+     * </p>
+     *
+     * <p>
+     * Unlike the {@link #commit()} method, the persistence of data to the 
repositories is not
+     * guaranteed to have occurred by the time that this method returns. 
Therefore, the session
+     * should NOT be reused.
+     * </p>
+     *
+     * <p>
+     * If the session is successfully committed, the given 
<code>onSuccess</code> {@link Runnable} will be called.
+     * At the point that the session commit is completed, the session will 
have already been committed, so any calls
+     * to {@link #rollback()} / {@link #rollback(boolean)} will not undo that 
session commit but instead roll back any chances

Review comment:
       changes*

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
##########
@@ -412,41 +419,130 @@ public TriggerResult getResult() throws 
InterruptedException {
             }
         };
 
-        final Future<?> future = runDataflowExecutor.submit(() -> 
executeDataflow(resultQueue, executionProgress));
+        final Future<?> future = runDataflowExecutor.submit(() -> 
executeDataflow(resultQueue, executionProgress, tracker));
         processFuture.set(future);
 
         return trigger;
     }
 
 
-    private void executeDataflow(final BlockingQueue<TriggerResult> 
resultQueue, final ExecutionProgress executionProgress) {
-        try {
-            for (final Connectable connectable : rootConnectables) {
-                final ProcessContext processContext = 
processContextFactory.createProcessContext(connectable);
-
-                final StatelessProcessSessionFactory sessionFactory = new 
StatelessProcessSessionFactory(connectable, repositoryContextFactory,
-                    processContextFactory, executionProgress);
+    private void executeDataflow(final BlockingQueue<TriggerResult> 
resultQueue, final ExecutionProgress executionProgress, final 
AsynchronousCommitTracker tracker) {
+        final long startNanos = System.nanoTime();
+        transactionThresholdMeter.reset();
 
-                final long start = System.nanoTime();
-                final long processingNanos;
+        try {
+            Connectable currentComponent = null;
 
-                // If there is no incoming connection, trigger once.
-                logger.debug("Triggering {}", connectable);
-                connectable.onTrigger(processContext, sessionFactory);
+            try {
+                boolean completionReached = false;
+                while (!completionReached) {
+                    for (final Connectable connectable : rootConnectables) {
+                        currentComponent = connectable;
+
+                        // Reset progress and trigger the component. This 
allows us to track whether or not any progress was made by the given connectable
+                        // during this invocation of its onTrigger method.
+                        tracker.resetProgress();
+                        trigger(connectable, executionProgress, tracker);
+
+                        // Keep track of the output of the source component so 
that we can determine whether or not we've reached our transaction threshold.
+                        
transactionThresholdMeter.incrementFlowFiles(tracker.getFlowFilesProduced());
+                        
transactionThresholdMeter.incrementBytes(tracker.getBytesProduced());
+                    }
+
+                    readyLoop: while (tracker.isAnyReady()) {

Review comment:
       Wow, first time I've seen a loop label used in Java. 👍 

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.session;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.groups.ProcessGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+import java.util.function.Consumer;
+
+/**
+ * Simple component used to track which Connectables are ready to be triggered
+ */
+public class AsynchronousCommitTracker {
+    private static final Logger logger = 
LoggerFactory.getLogger(AsynchronousCommitTracker.class);
+
+    private final Set<Connectable> ready = new LinkedHashSet<>();
+    private final Stack<CommitCallbacks> commitCallbacks = new Stack<>();
+    private int flowFilesProduced = 0;
+    private long bytesProduced = 0L;
+    private boolean progressMade = false;
+
+    public void addConnectable(final Connectable connectable) {
+        // this.ready is a LinkedHashSet that is responsible for ensuring that 
when a Connectable is added,
+        // it will be the first to be triggered. What we really want is to 
insert the new Connectable at the front
+        // of the collection, regardless of whether it's currently present or 
not. However, using a List or a Queue
+        // is not ideal because checking for the existence of the Connectable 
in a List or Queue is generally quite expensive,
+        // even though the insertion is cheap. To achieve the desired 
behavior, we call remove() and then add(), which ensures
+        // that the given Connectables goes to the END of the list. When 
getReady() is called, the LinkedHashSet is then
+        // copied into a List and reversed. There is almost certainly a much 
more efficient way to achieve this, but that

Review comment:
       I appreciate a humble codebase.

##########
File path: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
##########
@@ -440,15 +440,7 @@ public void process(final OutputStream out) throws 
IOException {
 
             session.getProvenanceReporter().receive(messageFlowfile, 
getTransitUri(mqttMessage.getTopic()));
             session.transfer(messageFlowfile, REL_MESSAGE);
-            session.commit();
-            if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {

Review comment:
       No more warn message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to