turcsanyip commented on a change in pull request #4425:
URL: https://github.com/apache/nifi/pull/4425#discussion_r466332998
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
*/
List<Connection> findAllConnections();
+ /**
+ * Initiates a request to drop all FlowFiles in all connections under this
process group (recursively).
+ * This method returns a DropFlowFileStatus that can be used to determine
the current state of the request.
+ * Additionally, the DropFlowFileStatus provides a request identifier that
can then be
+ * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link
#cancelDropAllFlowFiles(String)}
+ * methods in order to obtain the status later or cancel a request
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @param requestor the entity that is requesting that the FlowFiles be
dropped; this will be
+ * included in the Provenance Events that are generated.
+ *
+ * @return the status of the drop request, or <code>null</code> if there
is no
+ * connection in the process group.
+ */
+ DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String
requestor);
+
+ /**
+ * Returns the current status of a Drop AlL FlowFiles Request that was
initiated via the
Review comment:
Typo: All
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
*/
List<Connection> findAllConnections();
+ /**
+ * Initiates a request to drop all FlowFiles in all connections under this
process group (recursively).
+ * This method returns a DropFlowFileStatus that can be used to determine
the current state of the request.
+ * Additionally, the DropFlowFileStatus provides a request identifier that
can then be
+ * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link
#cancelDropAllFlowFiles(String)}
+ * methods in order to obtain the status later or cancel a request
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @param requestor the entity that is requesting that the FlowFiles be
dropped; this will be
+ * included in the Provenance Events that are generated.
+ *
+ * @return the status of the drop request, or <code>null</code> if there
is no
+ * connection in the process group.
+ */
+ DropFlowFileStatus dropAllFlowFiles(String requestIdentifier, String
requestor);
+
+ /**
+ * Returns the current status of a Drop AlL FlowFiles Request that was
initiated via the
+ * {@link #dropAllFlowFiles(String, String)} method with the given
identifier
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
Review comment:
Drop All FlowFiles Request
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
##########
@@ -1260,6 +1273,87 @@ public Connection findConnection(final String id) {
return findAllConnections(this);
}
+ @Override
+ public DropFlowFileStatus dropAllFlowFiles(String requestIdentifier,
String requestor) {
+ return handleDropAllFlowFiles(requestIdentifier, queue ->
queue.dropFlowFiles(requestIdentifier, requestor));
+ }
+
+ @Override
+ public DropFlowFileStatus getDropAllFlowFilesStatus(String
requestIdentifier) {
+ return handleDropAllFlowFiles(requestIdentifier, queue ->
queue.getDropFlowFileStatus(requestIdentifier));
+ }
+
+ @Override
+ public DropFlowFileStatus cancelDropAllFlowFiles(String requestIdentifier)
{
+ return handleDropAllFlowFiles(requestIdentifier, queue ->
queue.cancelDropFlowFileRequest(requestIdentifier));
+ }
+
+ private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId,
Function<FlowFileQueue, DropFlowFileStatus> function) {
+ DropFlowFileStatus resultDropFlowFileStatus;
+
+ List<Connection> connections = findAllConnections(this);
+
+ DropFlowFileRequest aggregateDropFlowFileStatus = new
DropFlowFileRequest(dropRequestId);
+ aggregateDropFlowFileStatus.setState(null);
+
+ AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
+
+ connections.stream()
+ .map(Connection::getFlowFileQueue)
+ .map(function::apply)
+ .forEach(additionalDropFlowFileStatus -> {
+ aggregate(aggregateDropFlowFileStatus,
additionalDropFlowFileStatus);
+ processedAtLeastOne.set(true);
+ });
+
+ if (processedAtLeastOne.get()) {
+ resultDropFlowFileStatus = aggregateDropFlowFileStatus;
+ } else {
+ resultDropFlowFileStatus = null;
+ }
+
+ return resultDropFlowFileStatus;
+ }
+
+ private void aggregate(DropFlowFileRequest aggregateDropFlowFileStatus,
DropFlowFileStatus additionalDropFlowFileStatus) {
+ QueueSize aggregateOriginalSize =
aggregate(aggregateDropFlowFileStatus.getOriginalSize(),
additionalDropFlowFileStatus.getOriginalSize());
+ QueueSize aggregateDroppedSize =
aggregate(aggregateDropFlowFileStatus.getDroppedSize(),
additionalDropFlowFileStatus.getDroppedSize());
+ QueueSize aggregateCurrentSize =
aggregate(aggregateDropFlowFileStatus.getCurrentSize(),
additionalDropFlowFileStatus.getCurrentSize());
+ DropFlowFileState aggregateState =
aggregate(aggregateDropFlowFileStatus.getState(),
additionalDropFlowFileStatus.getState());
+
+ aggregateDropFlowFileStatus.setOriginalSize(aggregateOriginalSize);
+ aggregateDropFlowFileStatus.setDroppedSize(aggregateDroppedSize);
+ aggregateDropFlowFileStatus.setCurrentSize(aggregateCurrentSize);
+ aggregateDropFlowFileStatus.setState(aggregateState);
+ }
+
+ private QueueSize aggregate(QueueSize size1, QueueSize size2) {
+ int objectsNr = Optional.ofNullable(size1)
+ .map(size -> size1.getObjectCount() + size2.getObjectCount())
Review comment:
`.map(size -> size.getObjectCount() + size2.getObjectCount())` would be
more natural / readable IMO.
Otherwise the wrapped object and the function parameter are not used.
Also for line 1336.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
##########
@@ -484,6 +485,41 @@
*/
List<Connection> findAllConnections();
+ /**
+ * Initiates a request to drop all FlowFiles in all connections under this
process group (recursively).
+ * This method returns a DropFlowFileStatus that can be used to determine
the current state of the request.
+ * Additionally, the DropFlowFileStatus provides a request identifier that
can then be
+ * passed to the {@link #getDropAllFlowFilesStatus(String)} and {@link
#cancelDropAllFlowFiles(String)}
+ * methods in order to obtain the status later or cancel a request
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
Review comment:
Drop All FlowFiles Request
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]