This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bc4e3e850b NIFI-10246: When syncing Controller Services, wait until
any DISABLING Controller Service has fully disabled before attempting to
synchronize them. Also performed some minor refactoring/cleanup for System
Tests in order to make writing a system test for this simpler
bc4e3e850b is described below
commit bc4e3e850bd2973a59b1d4726e439aa38d0559bc
Author: Mark Payne <[email protected]>
AuthorDate: Mon Jul 18 14:29:10 2022 -0400
NIFI-10246: When syncing Controller Services, wait until any DISABLING
Controller Service has fully disabled before attempting to synchronize them.
Also performed some minor refactoring/cleanup for System Tests in order to make
writing a system test for this simpler
This closes #6219
Signed-off-by: David Handermann <[email protected]>
---
.../service/StandardControllerServiceNode.java | 15 ++++
.../service/StandardControllerServiceProvider.java | 10 +++
.../controller/service/ControllerServiceNode.java | 10 +++
.../serialization/AffectedComponentSet.java | 8 +-
.../nifi/cs/tests/system/StandardSleepService.java | 2 +-
.../apache/nifi/tests/system/NiFiClientUtil.java | 2 +-
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 37 +++++++--
.../clustering/JoinClusterWithDifferentFlow.java | 13 +---
.../nifi/tests/system/clustering/OffloadIT.java | 15 +---
.../ControllerServiceEnableDisableConflictIT.java | 91 ++++++++++++++++++++++
10 files changed, 170 insertions(+), 33 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0c250499c7..0da7c98884 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -394,6 +394,7 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
return this.active.get();
}
+ @Override
public boolean awaitEnabled(final long timePeriod, final TimeUnit
timeUnit) throws InterruptedException {
LOG.debug("Waiting up to {} {} for {} to be enabled", timePeriod,
timeUnit, this);
final boolean enabled =
stateTransition.awaitStateOrInvalid(ControllerServiceState.ENABLED, timePeriod,
timeUnit);
@@ -407,6 +408,20 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
return enabled;
}
+ @Override
+ public boolean awaitDisabled(final long timePeriod, final TimeUnit
timeUnit) throws InterruptedException {
+ LOG.debug("Waiting up to {} {} for {} to be disabled", timePeriod,
timeUnit, this);
+ final boolean disabled =
stateTransition.awaitState(ControllerServiceState.DISABLED, timePeriod,
timeUnit);
+
+ if (disabled) {
+ LOG.debug("{} is now disabled", this);
+ } else {
+ LOG.debug("After {} {}, {} is NOT disabled", timePeriod, timeUnit,
this);
+ }
+
+ return disabled;
+ }
+
@Override
public void verifyCanPerformVerification() {
if (getState() != ControllerServiceState.DISABLED) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 6a1195cc75..414d629f0d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -426,6 +426,16 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
future.completeExceptionally(e);
}
}
+ } else {
+ boolean disabled = false;
+ while (!disabled) {
+ try {
+ disabled = serviceNode.awaitDisabled(1,
TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ logger.error("Failed to disable {}", serviceNode, e);
+ future.completeExceptionally(e);
+ }
+ }
}
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index d060d16526..f02322bd6f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -201,6 +201,16 @@ public interface ControllerServiceNode extends
ComponentNode, VersionedComponent
*/
boolean awaitEnabled(long timePeriod, TimeUnit timeUnit) throws
InterruptedException;
+
+ /**
+ * Waits up to the given amount of time for the Controller Service to
transition to a DISABLED state.
+ * @param timePeriod maximum amount of time to wait
+ * @param timeUnit the unit for the time period
+ * @return <code>true</code> if the Controller Service finished disabling,
<code>false</code> otherwise
+ * @throws InterruptedException if interrupted while waiting for the
service complete its enabling
+ */
+ boolean awaitDisabled(long timePeriod, TimeUnit timeUnit) throws
InterruptedException;
+
/**
* Verifies that the Controller Service is in a state in which it can
verify a configuration by calling
* {@link #verifyConfiguration(ConfigurationContext, ComponentLog, Map,
ExtensionManager)}.
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
index 81515e2e59..a23ccc773f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
@@ -45,6 +45,7 @@ import org.apache.nifi.remote.RemoteGroupPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -62,6 +63,11 @@ import java.util.stream.Collectors;
*/
public class AffectedComponentSet {
private static final Logger logger =
LoggerFactory.getLogger(AffectedComponentSet.class);
+ private static final Set<ControllerServiceState>
ACTIVE_CONTROLLER_SERVICE_STATES = new HashSet<>(Arrays.asList(
+ ControllerServiceState.ENABLED,
+ ControllerServiceState.ENABLING,
+ ControllerServiceState.DISABLING
+ ));
private final FlowController flowController;
private final FlowManager flowManager;
@@ -451,7 +457,7 @@ public class AffectedComponentSet {
processors.stream().filter(this::isActive).forEach(active::addProcessor);
reportingTasks.stream().filter(task -> task.getScheduledState() ==
ScheduledState.STARTING || task.getScheduledState() == ScheduledState.RUNNING
|| task.isRunning())
.forEach(active::addReportingTask);
- controllerServices.stream().filter(service -> service.getState() ==
ControllerServiceState.ENABLING || service.getState() ==
ControllerServiceState.ENABLED)
+ controllerServices.stream().filter(service ->
ACTIVE_CONTROLLER_SERVICE_STATES.contains(service.getState()))
.forEach(active::addControllerServiceWithoutReferences);
return active;
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
index 27ffca42e6..03d7d38d0c 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardSleepService.java
@@ -55,7 +55,7 @@ public class StandardSleepService extends
AbstractControllerService implements S
.build();
public static final PropertyDescriptor ON_DISABLED_SLEEP_TIME = new
PropertyDescriptor.Builder()
.name("@OnDisabled Sleep Time")
- .description("The amount of time to sleep when disabeld")
+ .description("The amount of time to sleep when disabled")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 2da3ac5dbe..15512cc694 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -799,7 +799,7 @@ public class NiFiClientUtil {
return servicesEntity.getControllerServices().stream()
.filter(svc -> serviceIds == null || serviceIds.isEmpty() ||
serviceIds.contains(svc.getId()))
- .filter(svc ->
!desiredState.equals(svc.getStatus().getRunStatus()))
+ .filter(svc ->
!desiredState.equalsIgnoreCase(svc.getComponent().getState()))
.collect(Collectors.toList());
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index c68fc7746f..b33ef6cb2a 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -19,6 +19,7 @@ package org.apache.nifi.tests.system;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
@@ -63,6 +64,8 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
// Group ID | Source
Name | Dest Name | Conn Name | Queue Size |
private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s |
%2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";
+ public static final RequestConfig DO_NOT_REPLICATE = () ->
Collections.singletonMap("X-Request-Replicated", "value");
+
public static final int CLIENT_API_PORT = 5671;
public static final int CLIENT_API_BASE_PORT = 5670;
public static final String NIFI_GROUP_ID = "org.apache.nifi";
@@ -157,12 +160,22 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
@Override
public NiFiInstanceFactory getInstanceFactory() {
+ return createStandaloneInstanceFactory();
+ }
+
+ public NiFiInstanceFactory createStandaloneInstanceFactory() {
return new SpawnedStandaloneNiFiInstanceFactory(
- new InstanceConfiguration.Builder()
-
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
- .instanceDirectory("target/standalone-instance")
- .overrideNifiProperties(getNifiPropertiesOverrides())
- .build());
+ new InstanceConfiguration.Builder()
+
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
+ .instanceDirectory("target/standalone-instance")
+ .overrideNifiProperties(getNifiPropertiesOverrides())
+ .build());
+ }
+
+ public NiFiInstanceFactory createTwoNodeInstanceFactory() {
+ return new SpawnedClusterNiFiInstanceFactory(
+ "src/test/resources/conf/clustered/node1/bootstrap.conf",
+ "src/test/resources/conf/clustered/node2/bootstrap.conf");
}
protected String getTestName() {
@@ -471,4 +484,18 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
final ConnectionStatusEntity statusEntity =
getConnectionStatus(connectionId);
return
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
}
+
+ public NodeDTO getNodeDtoByNodeIndex(final int nodeIndex) throws
NiFiClientException, IOException {
+ return getNodeDtoByApiPort(5670 + nodeIndex);
+ }
+
+ public NodeDTO getNodeDtoByApiPort(final int apiPort) throws
NiFiClientException, IOException {
+ final ClusterEntity clusterEntity =
getNifiClient().getControllerClient().getNodes();
+ final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
+ .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
+ .findAny()
+ .orElseThrow(() -> new RuntimeException("Could not locate Node
2"));
+
+ return node2Dto;
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
index eb103f8473..c0819b07ef 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
@@ -38,7 +38,6 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
-import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
@@ -114,7 +113,7 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
node2.start(true);
final File backupFile = getBackupFile(node2ConfDir);
- final NodeDTO node2Dto = getNodeDTO(5672);
+ final NodeDTO node2Dto = getNodeDtoByApiPort(5672);
verifyFlowContentsOnDisk(backupFile);
disconnectNode(node2Dto);
@@ -182,16 +181,6 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
}
- private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException,
IOException {
- final ClusterEntity clusterEntity =
getNifiClient().getControllerClient().getNodes();
- final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
- .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
- .findAny()
- .orElseThrow(() -> new RuntimeException("Could not locate Node
2"));
-
- return node2Dto;
- }
-
private void disconnectNode(final NodeDTO nodeDto) throws
NiFiClientException, IOException, InterruptedException {
// Disconnect Node 2 so that we can go to the node directly via the
REST API and ensure that the flow is correct.
final NodeEntity nodeEntity = new NodeEntity();
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
index f535325469..2b7bdb0676 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
@@ -23,7 +23,6 @@ import
org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
@@ -71,7 +70,7 @@ public class OffloadIT extends NiFiSystemIT {
waitForQueueNotEmpty(connectionEntity.getId());
- final NodeDTO node2Dto = getNodeDTO(5672);
+ final NodeDTO node2Dto = getNodeDtoByApiPort(5672);
disconnectNode(node2Dto);
waitForNodeStatus(node2Dto, "DISCONNECTED");
@@ -84,16 +83,6 @@ public class OffloadIT extends NiFiSystemIT {
waitForAllNodesConnected();
}
- private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException,
IOException {
- final ClusterEntity clusterEntity =
getNifiClient().getControllerClient().getNodes();
- final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
- .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
- .findAny()
- .orElseThrow(() -> new RuntimeException("Could not locate Node
2"));
-
- return node2Dto;
- }
-
private void disconnectNode(final NodeDTO nodeDto) throws
NiFiClientException, IOException, InterruptedException {
getClientUtil().disconnectNode(nodeDto.getNodeId());
@@ -101,7 +90,7 @@ public class OffloadIT extends NiFiSystemIT {
final Integer apiPort = nodeDto.getApiPort();
waitFor(() -> {
try {
- final NodeDTO dto = getNodeDTO(apiPort);
+ final NodeDTO dto = getNodeDtoByApiPort(apiPort);
final String status = dto.getStatus();
return "DISCONNECTED".equals(status);
} catch (final Exception e) {
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceEnableDisableConflictIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceEnableDisableConflictIT.java
new file mode 100644
index 0000000000..7b54ee91a4
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceEnableDisableConflictIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.controllerservice;
+
+import org.apache.nifi.tests.system.NiFiInstance;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class ControllerServiceEnableDisableConflictIT extends NiFiSystemIT {
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createTwoNodeInstanceFactory();
+ }
+
+ @Override
+ protected boolean isDestroyEnvironmentAfterEachTest() {
+ // We need to destroy the environment after each test because
otherwise, we have a situation where in the second invocation of
+ // #testJoinClusterWithEnabledServiceWhileDisabling when we restart
the node, we can have a race condition in which Node 2 is
+ // disconnected but hasn't yet acknowledged the disconnection. It's
then restarted. Meanwhile, Node 1 keeps attempting to notify
+ // Node 2 that it's been disconnected. Once Node 2 is restarted and
connects, Node 1 sends it the Disconnection request,
+ // and Node 2 then disconnects. This can cause issues during shutdown
/ cleanup.
+ // But if we tear down between tests, this won't occur.
+ return true;
+ }
+
+ @ParameterizedTest
+ @EnumSource(NodeReconnectMode.class)
+ public void testJoinClusterWithEnabledServiceWhileDisabling(final
NodeReconnectMode reconnectMode) throws NiFiClientException, IOException,
InterruptedException {
+ final ControllerServiceEntity sleepService =
getClientUtil().createControllerService("StandardSleepService");
+ getClientUtil().updateControllerServiceProperties(sleepService,
Collections.singletonMap("@OnDisabled Sleep Time", "10 secs"));
+
+ getClientUtil().enableControllerService(sleepService);
+ final NodeDTO node2Dto = getNodeDtoByNodeIndex(2);
+ final String node2Id = node2Dto.getNodeId();
+
+ getClientUtil().disconnectNode(node2Id);
+ waitForNodeStatus(node2Dto, "DISCONNECTED");
+
+ switchClientToNode(2);
+
+ getClientUtil().disableControllerService(sleepService);
+ switchClientToNode(1);
+
+ switch (reconnectMode) {
+ case RECONNECT_DIRECTLY:
+ getClientUtil().connectNode(node2Id);
+ break;
+ case RESTART_NODE:
+ final NiFiInstance node2Instance =
getNiFiInstance().getNodeInstance(2);
+ node2Instance.stop();
+ node2Instance.start(true);
+ break;
+ }
+
+ waitForAllNodesConnected();
+
+ waitFor(() -> {
+ final ControllerServiceEntity serviceEntity =
getNifiClient().getControllerServicesClient().getControllerService(sleepService.getId());
+ return
"ENABLED".equalsIgnoreCase(serviceEntity.getComponent().getState());
+ });
+ }
+
+
+ private enum NodeReconnectMode {
+ RECONNECT_DIRECTLY,
+ RESTART_NODE;
+ }
+}