mcgilman commented on code in PR #10748:
URL: https://github.com/apache/nifi/pull/10748#discussion_r2677571037


##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java:
##########
@@ -84,5 +84,11 @@ public CompletableFuture<Void> stopProcessors() {
         authContext.authorizeWrite();
         return delegate.stopProcessors();
     }
+
+    @Override
+    public int getActiveThreadCount() {

Review Comment:
   This requires a changes to `ProcessGroupLifecycle` in `nifi-api`. I don't 
see a PR for it open currently.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorAction.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import java.util.Objects;
+
+public class StandardConnectorAction implements ConnectorAction {
+
+    private final String name;
+    private final String description;
+    private final boolean allowed;
+    private final String reasonNotAllowed;
+
+    public StandardConnectorAction(final String name, final String 
description, final boolean allowed, final String reasonNotAllowed) {
+        this.name = Objects.requireNonNull(name, "name is required");
+        this.description = Objects.requireNonNull(description, "description is 
required");
+        this.allowed = Objects.requireNonNull(allowed, "allowed flag is 
required");

Review Comment:
   `requireNonNull` is not needed since `allowed` is a primitive boolean.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java:
##########
@@ -840,6 +841,167 @@ public void discardWorkingConfiguration() {
         recreateWorkingFlowContext();
     }
 
+    @Override
+    public List<ConnectorAction> getAvailableActions() {
+        final List<ConnectorAction> actions = new ArrayList<>();
+        final ConnectorState currentState = getCurrentState();
+        final boolean dataQueued = 
activeFlowContext.getManagedProcessGroup().isDataQueued();
+        final boolean stopped = currentState == ConnectorState.STOPPED || 
currentState == ConnectorState.DISABLED
+            || currentState == ConnectorState.UPDATED || currentState == 
ConnectorState.UPDATE_FAILED;
+
+        actions.add(createStartAction(currentState, stopped));
+        actions.add(createStopAction(currentState));
+        actions.add(createConfigureAction());
+        actions.add(createDiscardWorkingConfigAction());
+        actions.add(createPurgeFlowFilesAction(stopped, dataQueued));
+        actions.add(createDrainFlowFilesAction(stopped, dataQueued));
+        actions.add(createApplyUpdatesAction(currentState));
+        actions.add(createDeleteAction(stopped, dataQueued));
+
+        return actions;
+    }
+
+    private ConnectorAction createStartAction(final ConnectorState 
currentState, final boolean stopped) {
+        final boolean allowed;
+        final String reason;
+
+        if (currentState == ConnectorState.DISABLED) {
+            allowed = false;
+            reason = "Connector is disabled";
+        } else if (!stopped) {
+            allowed = false;
+            reason = "Connector must be stopped";
+        } else {
+            allowed = true;
+            reason = null;
+        }
+
+        return new StandardConnectorAction("START", "Start the connector", 
allowed, reason);
+    }
+
+    private ConnectorAction createStopAction(final ConnectorState 
currentState) {
+        final boolean allowed;
+        if (currentState == ConnectorState.RUNNING || currentState == 
ConnectorState.STARTING) {
+            allowed = true;
+        } else if (currentState == ConnectorState.UPDATED || currentState == 
ConnectorState.UPDATE_FAILED) {
+            allowed = 
hasActiveThread(activeFlowContext.getManagedProcessGroup());
+        } else {
+            allowed = false;
+        }
+
+        final String reason = allowed ? null : "Connector is not running";
+        return new StandardConnectorAction("STOP", "Stop the connector", 
allowed, reason);
+    }
+
+    private ConnectorAction createConfigureAction() {
+        return new StandardConnectorAction("CONFIGURE", "Configure the 
connector", true, null);
+    }
+
+    private ConnectorAction createDiscardWorkingConfigAction() {
+        final boolean allowed = hasWorkingConfigurationChanges();
+        final String reason = allowed ? null : "No pending changes to discard";
+
+        return new StandardConnectorAction("DISCARD_WORKING_CONFIGURATION", 
"Discard any changes made to the working configuration", allowed, reason);
+    }
+
+    private boolean hasActiveThread(final ProcessGroup group) {
+        for (final ProcessorNode processor : group.getProcessors()) {
+            if (processor.getActiveThreadCount() > 0) {
+                return true;
+            }
+        }
+
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            if (hasActiveThread(childGroup)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private ConnectorAction createPurgeFlowFilesAction(final boolean stopped, 
final boolean dataQueued) {
+        final boolean allowed;
+        final String reason;
+
+        if (!stopped) {
+            allowed = false;
+            reason = "Connector must be stopped";
+        } else if (!dataQueued) {
+            allowed = false;
+            reason = "No data is queued";
+        } else {
+            allowed = true;
+            reason = null;
+        }
+
+        return new StandardConnectorAction("PURGE_FLOWFILES", "Purge all 
FlowFiles from the connector, dropping all data without processing it", 
allowed, reason);
+    }
+
+    private ConnectorAction createDrainFlowFilesAction(final boolean stopped, 
final boolean dataQueued) {
+        final boolean allowed;
+        final String reason;
+
+        if (!stopped) {
+            allowed = false;
+            reason = "Connector must be stopped";
+        } else if (!dataQueued) {
+            allowed = false;
+            reason = "No data is queued";
+        } else {
+            allowed = true;
+            reason = null;
+        }
+
+        return new StandardConnectorAction("DRAIN_FLOWFILES", "Process data 
that is currently in the flow but do not ingest any additional data", allowed, 
reason);
+    }
+
+    private ConnectorAction createApplyUpdatesAction(final ConnectorState 
currentState) {
+        final boolean allowed;
+        final String reason;
+
+        if (currentState == ConnectorState.PREPARING_FOR_UPDATE || 
currentState == ConnectorState.UPDATING) {
+            allowed = false;
+            reason = "Connector is updating";
+        } else if (!hasWorkingConfigurationChanges()) {
+            allowed = false;
+            reason = "No pending changes";
+        } else {
+            allowed = true;
+            reason = null;
+        }
+
+        return new StandardConnectorAction("APPLY_UPDATES", "Apply the working 
configuration to the active configuration", allowed, reason);
+    }
+
+    private ConnectorAction createDeleteAction(final boolean stopped, final 
boolean dataQueued) {
+        final boolean allowed;
+        final String reason;
+
+        if (!stopped) {
+            allowed = false;
+            reason = "Connector must be stopped";
+        } else if (dataQueued) {
+            allowed = false;
+            reason = "Data is queued";
+        } else {
+            allowed = true;
+            reason = null;
+        }
+
+        return new StandardConnectorAction("DELETE", "Delete the connector", 
allowed, reason);
+    }
+
+    private boolean hasWorkingConfigurationChanges() {
+        if (workingFlowContext == null) {
+            return false;
+        }
+
+        final ConnectorConfiguration activeConfig = 
activeFlowContext.getConfigurationContext().toConnectorConfiguration();
+        final ConnectorConfiguration workingConfig = 
workingFlowContext.getConfigurationContext().toConnectorConfiguration();
+        return !Objects.equals(activeConfig, workingConfig);

Review Comment:
   Should `workingFlowContext` be stored in a local variable at the beginning 
of this method? The presence of the null check suggests the variable could be 
`null`. If the variable is nulled out after the check we'd have a null pointer 
on 1001. Storing in a local variable should help prevent that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to