This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bc4e3e850b NIFI-10246: When syncing Controller Services, wait until 
any DISABLING Controller Service has fully disabled before attempting to 
synchronize them. Also performed some minor refactoring/cleanup for System 
Tests in order to make writing a system test for this simpler
bc4e3e850b is described below

commit bc4e3e850bd2973a59b1d4726e439aa38d0559bc
Author: Mark Payne <[email protected]>
AuthorDate: Mon Jul 18 14:29:10 2022 -0400

    NIFI-10246: When syncing Controller Services, wait until any DISABLING 
Controller Service has fully disabled before attempting to synchronize them. 
Also performed some minor refactoring/cleanup for System Tests in order to make 
writing a system test for this simpler
    
    This closes #6219
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../service/StandardControllerServiceNode.java     | 15 ++++
 .../service/StandardControllerServiceProvider.java | 10 +++
 .../controller/service/ControllerServiceNode.java  | 10 +++
 .../serialization/AffectedComponentSet.java        |  8 +-
 .../nifi/cs/tests/system/StandardSleepService.java |  2 +-
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  2 +-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java | 37 +++++++--
 .../clustering/JoinClusterWithDifferentFlow.java   | 13 +---
 .../nifi/tests/system/clustering/OffloadIT.java    | 15 +---
 .../ControllerServiceEnableDisableConflictIT.java  | 91 ++++++++++++++++++++++
 10 files changed, 170 insertions(+), 33 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0c250499c7..0da7c98884 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -394,6 +394,7 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         return this.active.get();
     }
 
+    @Override
     public boolean awaitEnabled(final long timePeriod, final TimeUnit 
timeUnit) throws InterruptedException {
         LOG.debug("Waiting up to {} {} for {} to be enabled", timePeriod, 
timeUnit, this);
         final boolean enabled = 
stateTransition.awaitStateOrInvalid(ControllerServiceState.ENABLED, timePeriod, 
timeUnit);
@@ -407,6 +408,20 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         return enabled;
     }
 
+    @Override
+    public boolean awaitDisabled(final long timePeriod, final TimeUnit 
timeUnit) throws InterruptedException {
+        LOG.debug("Waiting up to {} {} for {} to be disabled", timePeriod, 
timeUnit, this);
+        final boolean disabled = 
stateTransition.awaitState(ControllerServiceState.DISABLED, timePeriod, 
timeUnit);
+
+        if (disabled) {
+            LOG.debug("{} is now disabled", this);
+        } else {
+            LOG.debug("After {} {}, {} is NOT disabled", timePeriod, timeUnit, 
this);
+        }
+
+        return disabled;
+    }
+
     @Override
     public void verifyCanPerformVerification() {
         if (getState() != ControllerServiceState.DISABLED) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 6a1195cc75..414d629f0d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -426,6 +426,16 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
                         future.completeExceptionally(e);
                     }
                 }
+            } else {
+                boolean disabled = false;
+                while (!disabled) {
+                    try {
+                        disabled = serviceNode.awaitDisabled(1, 
TimeUnit.SECONDS);
+                    } catch (final Exception e) {
+                        logger.error("Failed to disable {}", serviceNode, e);
+                        future.completeExceptionally(e);
+                    }
+                }
             }
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index d060d16526..f02322bd6f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -201,6 +201,16 @@ public interface ControllerServiceNode extends 
ComponentNode, VersionedComponent
      */
     boolean awaitEnabled(long timePeriod, TimeUnit timeUnit) throws 
InterruptedException;
 
+
+    /**
+     * Waits up to the given amount of time for the Controller Service to 
transition to a DISABLED state.
+     * @param timePeriod maximum amount of time to wait
+     * @param timeUnit the unit for the time period
+     * @return <code>true</code> if the Controller Service finished disabling, 
<code>false</code> otherwise
+     * @throws InterruptedException if interrupted while waiting for the 
service complete its enabling
+     */
+    boolean awaitDisabled(long timePeriod, TimeUnit timeUnit) throws 
InterruptedException;
+
     /**
      * Verifies that the Controller Service is in a state in which it can 
verify a configuration by calling
      * {@link #verifyConfiguration(ConfigurationContext, ComponentLog, Map, 
ExtensionManager)}.
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
index 81515e2e59..a23ccc773f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
@@ -45,6 +45,7 @@ import org.apache.nifi.remote.RemoteGroupPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -62,6 +63,11 @@ import java.util.stream.Collectors;
  */
 public class AffectedComponentSet {
     private static final Logger logger = 
LoggerFactory.getLogger(AffectedComponentSet.class);
+    private static final Set<ControllerServiceState> 
ACTIVE_CONTROLLER_SERVICE_STATES = new HashSet<>(Arrays.asList(
+        ControllerServiceState.ENABLED,
+        ControllerServiceState.ENABLING,
+        ControllerServiceState.DISABLING
+    ));
     private final FlowController flowController;
     private final FlowManager flowManager;
 
@@ -451,7 +457,7 @@ public class AffectedComponentSet {
         
processors.stream().filter(this::isActive).forEach(active::addProcessor);
         reportingTasks.stream().filter(task -> task.getScheduledState() == 
ScheduledState.STARTING || task.getScheduledState() == ScheduledState.RUNNING 
|| task.isRunning())
             .forEach(active::addReportingTask);
-        controllerServices.stream().filter(service -> service.getState() == 
ControllerServiceState.ENABLING || service.getState() == 
ControllerServiceState.ENABLED)
+        controllerServices.stream().filter(service -> 
ACTIVE_CONTROLLER_SERVICE_STATES.contains(service.getState()))
             .forEach(active::addControllerServiceWithoutReferences);
 
         return active;
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
index 27ffca42e6..03d7d38d0c 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
@@ -55,7 +55,7 @@ public class StandardSleepService extends 
AbstractControllerService implements S
         .build();
     public static final PropertyDescriptor ON_DISABLED_SLEEP_TIME = new 
PropertyDescriptor.Builder()
         .name("@OnDisabled Sleep Time")
-        .description("The amount of time to sleep when disabeld")
+        .description("The amount of time to sleep when disabled")
         .required(false)
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
         .defaultValue("0 sec")
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 2da3ac5dbe..15512cc694 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -799,7 +799,7 @@ public class NiFiClientUtil {
 
         return servicesEntity.getControllerServices().stream()
             .filter(svc -> serviceIds == null || serviceIds.isEmpty() || 
serviceIds.contains(svc.getId()))
-            .filter(svc -> 
!desiredState.equals(svc.getStatus().getRunStatus()))
+            .filter(svc -> 
!desiredState.equalsIgnoreCase(svc.getComponent().getState()))
             .collect(Collectors.toList());
     }
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index c68fc7746f..b33ef6cb2a 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -19,6 +19,7 @@ package org.apache.nifi.tests.system;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
@@ -63,6 +64,8 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
     //                                                   Group ID  | Source 
Name | Dest Name | Conn Name  | Queue Size |
     private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s | 
%2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";
 
+    public static final RequestConfig DO_NOT_REPLICATE = () -> 
Collections.singletonMap("X-Request-Replicated", "value");
+
     public static final int CLIENT_API_PORT = 5671;
     public static final int CLIENT_API_BASE_PORT = 5670;
     public static final String NIFI_GROUP_ID = "org.apache.nifi";
@@ -157,12 +160,22 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
 
     @Override
     public NiFiInstanceFactory getInstanceFactory() {
+        return createStandaloneInstanceFactory();
+    }
+
+    public NiFiInstanceFactory createStandaloneInstanceFactory() {
         return new SpawnedStandaloneNiFiInstanceFactory(
-                new InstanceConfiguration.Builder()
-                        
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
-                        .instanceDirectory("target/standalone-instance")
-                        .overrideNifiProperties(getNifiPropertiesOverrides())
-                        .build());
+            new InstanceConfiguration.Builder()
+                
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
+                .instanceDirectory("target/standalone-instance")
+                .overrideNifiProperties(getNifiPropertiesOverrides())
+                .build());
+    }
+
+    public NiFiInstanceFactory createTwoNodeInstanceFactory() {
+        return new SpawnedClusterNiFiInstanceFactory(
+            "src/test/resources/conf/clustered/node1/bootstrap.conf",
+            "src/test/resources/conf/clustered/node2/bootstrap.conf");
     }
 
     protected String getTestName() {
@@ -471,4 +484,18 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
         final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connectionId);
         return 
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
     }
+
+    public NodeDTO getNodeDtoByNodeIndex(final int nodeIndex) throws 
NiFiClientException, IOException {
+        return getNodeDtoByApiPort(5670 + nodeIndex);
+    }
+
+    public NodeDTO getNodeDtoByApiPort(final int apiPort) throws 
NiFiClientException, IOException {
+        final ClusterEntity clusterEntity = 
getNifiClient().getControllerClient().getNodes();
+        final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
+            .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
+            .findAny()
+            .orElseThrow(() -> new RuntimeException("Could not locate Node 
2"));
+
+        return node2Dto;
+    }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
index eb103f8473..c0819b07ef 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
@@ -38,7 +38,6 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
 import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
 import org.apache.nifi.web.api.entity.AffectedComponentEntity;
-import org.apache.nifi.web.api.entity.ClusterEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.NodeEntity;
@@ -114,7 +113,7 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
         node2.start(true);
 
         final File backupFile = getBackupFile(node2ConfDir);
-        final NodeDTO node2Dto = getNodeDTO(5672);
+        final NodeDTO node2Dto = getNodeDtoByApiPort(5672);
 
         verifyFlowContentsOnDisk(backupFile);
         disconnectNode(node2Dto);
@@ -182,16 +181,6 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
     }
 
 
-    private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException, 
IOException {
-        final ClusterEntity clusterEntity = 
getNifiClient().getControllerClient().getNodes();
-        final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
-            .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
-            .findAny()
-            .orElseThrow(() -> new RuntimeException("Could not locate Node 
2"));
-
-        return node2Dto;
-    }
-
     private void disconnectNode(final NodeDTO nodeDto) throws 
NiFiClientException, IOException, InterruptedException {
         // Disconnect Node 2 so that we can go to the node directly via the 
REST API and ensure that the flow is correct.
         final NodeEntity nodeEntity = new NodeEntity();
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
index f535325469..2b7bdb0676 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
@@ -23,7 +23,6 @@ import 
org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.entity.ClusterEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.junit.jupiter.api.Test;
@@ -71,7 +70,7 @@ public class OffloadIT extends NiFiSystemIT {
 
         waitForQueueNotEmpty(connectionEntity.getId());
 
-        final NodeDTO node2Dto = getNodeDTO(5672);
+        final NodeDTO node2Dto = getNodeDtoByApiPort(5672);
 
         disconnectNode(node2Dto);
         waitForNodeStatus(node2Dto, "DISCONNECTED");
@@ -84,16 +83,6 @@ public class OffloadIT extends NiFiSystemIT {
         waitForAllNodesConnected();
     }
 
-    private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException, 
IOException {
-        final ClusterEntity clusterEntity = 
getNifiClient().getControllerClient().getNodes();
-        final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
-            .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
-            .findAny()
-            .orElseThrow(() -> new RuntimeException("Could not locate Node 
2"));
-
-        return node2Dto;
-    }
-
 
     private void disconnectNode(final NodeDTO nodeDto) throws 
NiFiClientException, IOException, InterruptedException {
         getClientUtil().disconnectNode(nodeDto.getNodeId());
@@ -101,7 +90,7 @@ public class OffloadIT extends NiFiSystemIT {
         final Integer apiPort = nodeDto.getApiPort();
         waitFor(() -> {
             try {
-                final NodeDTO dto = getNodeDTO(apiPort);
+                final NodeDTO dto = getNodeDtoByApiPort(apiPort);
                 final String status = dto.getStatus();
                 return "DISCONNECTED".equals(status);
             } catch (final Exception e) {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceEnableDisableConflictIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceEnableDisableConflictIT.java
new file mode 100644
index 0000000000..7b54ee91a4
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceEnableDisableConflictIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.controllerservice;
+
+import org.apache.nifi.tests.system.NiFiInstance;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class ControllerServiceEnableDisableConflictIT extends NiFiSystemIT {
+    @Override
+    public NiFiInstanceFactory getInstanceFactory() {
+        return createTwoNodeInstanceFactory();
+    }
+
+    @Override
+    protected boolean isDestroyEnvironmentAfterEachTest() {
+        // We need to destroy the environment after each test because 
otherwise, we have a situation where in the second invocation of
+        // #testJoinClusterWithEnabledServiceWhileDisabling when we restart 
the node, we can have a race condition in which Node 2 is
+        // disconnected but hasn't yet acknowledged the disconnection. It's 
then restarted. Meanwhile, Node 1 keeps attempting to notify
+        // Node 2 that it's been disconnected. Once Node 2 is restarted and 
connects, Node 1 sends it the Disconnection request,
+        // and Node 2 then disconnects. This can cause issues during shutdown 
/ cleanup.
+        // But if we tear down between tests, this won't occur.
+        return true;
+    }
+
+    @ParameterizedTest
+    @EnumSource(NodeReconnectMode.class)
+    public void testJoinClusterWithEnabledServiceWhileDisabling(final 
NodeReconnectMode reconnectMode) throws NiFiClientException, IOException, 
InterruptedException {
+        final ControllerServiceEntity sleepService = 
getClientUtil().createControllerService("StandardSleepService");
+        getClientUtil().updateControllerServiceProperties(sleepService, 
Collections.singletonMap("@OnDisabled Sleep Time", "10 secs"));
+
+        getClientUtil().enableControllerService(sleepService);
+        final NodeDTO node2Dto = getNodeDtoByNodeIndex(2);
+        final String node2Id = node2Dto.getNodeId();
+
+        getClientUtil().disconnectNode(node2Id);
+        waitForNodeStatus(node2Dto, "DISCONNECTED");
+
+        switchClientToNode(2);
+
+        getClientUtil().disableControllerService(sleepService);
+        switchClientToNode(1);
+
+        switch (reconnectMode) {
+            case RECONNECT_DIRECTLY:
+                getClientUtil().connectNode(node2Id);
+                break;
+            case RESTART_NODE:
+                final NiFiInstance node2Instance = 
getNiFiInstance().getNodeInstance(2);
+                node2Instance.stop();
+                node2Instance.start(true);
+                break;
+        }
+
+        waitForAllNodesConnected();
+
+        waitFor(() -> {
+            final ControllerServiceEntity serviceEntity = 
getNifiClient().getControllerServicesClient().getControllerService(sleepService.getId());
+            return 
"ENABLED".equalsIgnoreCase(serviceEntity.getComponent().getState());
+        });
+    }
+
+
+    private enum NodeReconnectMode {
+        RECONNECT_DIRECTLY,
+        RESTART_NODE;
+    }
+}

Reply via email to