turcsanyip commented on a change in pull request #4425:
URL: https://github.com/apache/nifi/pull/4425#discussion_r466332998



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
      */
     List<Connection> findAllConnections();
 
+    /**
+     * Initiates a request to drop all FlowFiles in all connections under this 
process group (recursively).
+     * This method returns a DropFlowFileStatus that can be used to determine 
the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that 
can then be
+     * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link 
#cancelDropAllFlowFiles(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @param requestor the entity that is requesting that the FlowFiles be 
dropped; this will be
+     *            included in the Provenance Events that are generated.
+     *
+     * @return the status of the drop request, or <code>null</code> if there 
is no
+     *         connection in the process group.
+     */
+    DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String 
requestor);
+
+    /**
+     * Returns the current status of a Drop AlL FlowFiles Request that was 
initiated via the

Review comment:
       Typo: All

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
      */
     List<Connection> findAllConnections();
 
+    /**
+     * Initiates a request to drop all FlowFiles in all connections under this 
process group (recursively).
+     * This method returns a DropFlowFileStatus that can be used to determine 
the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that 
can then be
+     * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link 
#cancelDropAllFlowFiles(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @param requestor the entity that is requesting that the FlowFiles be 
dropped; this will be
+     *            included in the Provenance Events that are generated.
+     *
+     * @return the status of the drop request, or <code>null</code> if there 
is no
+     *         connection in the process group.
+     */
+    DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String 
requestor);
+
+    /**
+     * Returns the current status of a Drop AlL FlowFiles Request that was 
initiated via the
+     * {@link #dropAllFlowFiles(String, String)} method with the given 
identifier
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request

Review comment:
       Drop All FlowFiles Request

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
##########
@@ -1260,6 +1273,87 @@ public Connection findConnection(final String id) {
         return findAllConnections(this);
     }
 
+    @Override
+    public DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, 
String requestor) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> 
queue.dropFlowFiles(requestIdentifier, requestor));
+    }
+
+    @Override
+    public DropFlowFileStatus getDropAllFlowFilesStatus(String 
requestIdentifier) {
+        return handleDropAllFlowFiles(requestIdentifier, queue -> 
queue.getDropFlowFileStatus(requestIdentifier));
+    }
+
+    @Override
+    public DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier) 
{
+        return handleDropAllFlowFiles(requestIdentifier, queue -> 
queue.cancelDropFlowFileRequest(requestIdentifier));
+    }
+
+    private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, 
Function<FlowFileQueue, DropFlowFileStatus> function) {
+        DropFlowFileStatus resultDropFlowFileStatus;
+
+        List<Connection> connections = findAllConnections(this);
+
+        DropFlowFileRequest aggregateDropFlowFileStatus = new 
DropFlowFileRequest(dropRequestId);
+        aggregateDropFlowFileStatus.setState(null);
+
+        AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
+
+        connections.stream()
+            .map(Connection::getFlowFileQueue)
+            .map(function::apply)
+            .forEach(additionalDropFlowFileStatus -> {
+                aggregate(aggregateDropFlowFileStatus, 
additionalDropFlowFileStatus);
+                processedAtLeastOne.set(true);
+            });
+
+        if (processedAtLeastOne.get()) {
+            resultDropFlowFileStatus = aggregateDropFlowFileStatus;
+        } else {
+            resultDropFlowFileStatus = null;
+        }
+
+        return resultDropFlowFileStatus;
+    }
+
+    private void aggregate(DropFlowFileRequest aggregateDropFlowFileStatus, 
DropFlowFileStatus additionalDropFlowFileStatus) {
+        QueueSize aggregateOriginalSize = 
aggregate(aggregateDropFlowFileStatus.getOriginalSize(), 
additionalDropFlowFileStatus.getOriginalSize());
+        QueueSize aggregateDroppedSize = 
aggregate(aggregateDropFlowFileStatus.getDroppedSize(), 
additionalDropFlowFileStatus.getDroppedSize());
+        QueueSize aggregateCurrentSize = 
aggregate(aggregateDropFlowFileStatus.getCurrentSize(), 
additionalDropFlowFileStatus.getCurrentSize());
+        DropFlowFileState aggregateState = 
aggregate(aggregateDropFlowFileStatus.getState(), 
additionalDropFlowFileStatus.getState());
+
+        aggregateDropFlowFileStatus.setOriginalSize(aggregateOriginalSize);
+        aggregateDropFlowFileStatus.setDroppedSize(aggregateDroppedSize);
+        aggregateDropFlowFileStatus.setCurrentSize(aggregateCurrentSize);
+        aggregateDropFlowFileStatus.setState(aggregateState);
+    }
+
+    private QueueSize aggregate(QueueSize size1, QueueSize size2) {
+        int objectsNr = Optional.ofNullable(size1)
+            .map(size -> size1.getObjectCount() + size2.getObjectCount())

Review comment:
       `.map(size -> size.getObjectCount() + size2.getObjectCount())` would be 
more natural / readable IMO.
   Otherwise the wrapped object and the function parameter are not used.
   Also for line 1336.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
      */
     List<Connection> findAllConnections();
 
+    /**
+     * Initiates a request to drop all FlowFiles in all connections under this 
process group (recursively).
+     * This method returns a DropFlowFileStatus that can be used to determine 
the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that 
can then be
+     * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link 
#cancelDropAllFlowFiles(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request

Review comment:
       Drop All FlowFiles Request




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to