This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0497907829 NIFI-10262: Ensure that when an Exception is thrown from a
Controller Service's @OnEnabled method that we properly handle that Exception
and continue enabling the other services in the given collection of services
0497907829 is described below
commit 0497907829ca25523622c021073b90c1aa9a3094
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jul 22 16:13:15 2022 -0400
NIFI-10262: Ensure that when an Exception is thrown from a Controller
Service's @OnEnabled method that we properly handle that Exception and continue
enabling the other services in the given collection of services
This closes #6236
Signed-off-by: David Handermann <[email protected]>
---
.../service/StandardControllerServiceNode.java | 8 +-
.../service/StandardControllerServiceProvider.java | 126 +++++++++++++--------
.../cs/tests/system/LifecycleFailureService.java | 78 +++++++++++++
.../org.apache.nifi.controller.ControllerService | 1 +
.../apache/nifi/tests/system/NiFiClientUtil.java | 18 +++
.../system/clustering/FlowSynchronizationIT.java | 1 -
.../ControllerServiceLifecycleIT.java | 57 ++++++++++
7 files changed, 236 insertions(+), 53 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0da7c98884..02189a1579 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -324,7 +324,7 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
@Override
public void verifyCanDisable(final Set<ControllerServiceNode>
ignoreReferences) {
if (!this.isActive()) {
- throw new IllegalStateException("Cannot disable " +
getControllerServiceImplementation().getIdentifier() + " because it is not
enabled");
+ return;
}
final ControllerServiceReference references = getReferences();
@@ -608,16 +608,12 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
final Throwable cause = e instanceof
InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new
SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
- componentLog.error("Failed to invoke @OnEnabled method
due to {}", cause);
- LOG.error("Failed to invoke @OnEnabled method of {}
due to {}", getControllerServiceImplementation(), cause.toString());
+ componentLog.error("Failed to invoke @OnEnabled
method", cause);
invokeDisable(configContext);
if (isActive()) {
scheduler.schedule(this,
administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {
- try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(getExtensionManager(),
getControllerServiceImplementation().getClass(), getIdentifier())) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class,
getControllerServiceImplementation(), configContext);
- }
stateTransition.disable();
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 414d629f0d..735414d376 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -48,9 +48,11 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -224,12 +226,10 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
if (shouldStart) {
for (ControllerServiceNode controllerServiceNode : serviceNodes) {
try {
- if (!controllerServiceNode.isActive()) {
- final Future<Void> future =
enableControllerServiceAndDependencies(controllerServiceNode);
+ final Future<Void> future =
enableControllerServiceAndDependencies(controllerServiceNode);
- future.get(30, TimeUnit.SECONDS);
- logger.debug("Successfully enabled {}; service state =
{}", controllerServiceNode, controllerServiceNode.getState());
- }
+ future.get(30, TimeUnit.SECONDS);
+ logger.debug("Successfully enabled {}; service state =
{}", controllerServiceNode, controllerServiceNode.getState());
} catch (final ControllerServiceNotValidException csnve) {
logger.warn("Failed to enable service {} because it is not
currently valid", controllerServiceNode);
} catch (Exception e) {
@@ -247,14 +247,20 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
public Future<Void> enableControllerServicesAsync(final
Collection<ControllerServiceNode> serviceNodes) {
final CompletableFuture<Void> future = new CompletableFuture<>();
processScheduler.submitFrameworkTask(() -> {
- enableControllerServices(serviceNodes, future);
- future.complete(null);
+ try {
+ enableControllerServices(serviceNodes, future);
+ future.complete(null);
+ } catch (final Exception e) {
+ future.completeExceptionally(e);
+ }
});
return future;
}
- private void enableControllerServices(final
Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void>
completableFuture) {
+ private void enableControllerServices(final
Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void>
completableFuture) throws Exception {
+ Exception firstFailure = null;
+
// validate that we are able to start all of the services.
for (final ControllerServiceNode controllerServiceNode : serviceNodes)
{
if (completableFuture.isCancelled()) {
@@ -262,29 +268,37 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
}
try {
- if (!controllerServiceNode.isActive()) {
- final Future<Void> future =
enableControllerServiceAndDependencies(controllerServiceNode);
+ // If service is already active, just move on to the next
+ if (controllerServiceNode.isActive()) {
+ continue;
+ }
- while (true) {
- try {
- future.get(1, TimeUnit.SECONDS);
- logger.debug("Successfully enabled {}; service
state = {}", controllerServiceNode, controllerServiceNode.getState());
- break;
- } catch (final TimeoutException e) {
- if (completableFuture.isCancelled()) {
- return;
- }
- } catch (final Exception e) {
- logger.warn("Failed to enable service {}",
controllerServiceNode, e);
- completableFuture.completeExceptionally(e);
-
- if (this.bulletinRepo != null) {
-
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller
Service",
- Severity.ERROR.name(), "Could not enable "
+ controllerServiceNode + " due to " + e));
- }
+ final Future<Void> future =
enableControllerServiceAndDependencies(controllerServiceNode);
+ // Wait for the future to complete. But if the
completableFuture ever is canceled, we want to stop waiting and return.
+ while (true) {
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ logger.debug("Successfully enabled {}; service state =
{}", controllerServiceNode, controllerServiceNode.getState());
+ break;
+ } catch (final TimeoutException e) {
+ if (completableFuture.isCancelled()) {
return;
}
+ } catch (final Exception e) {
+ logger.warn("Failed to enable service {}",
controllerServiceNode, e);
+ if (firstFailure == null) {
+ firstFailure = e;
+ } else {
+ firstFailure.addSuppressed(e);
+ }
+
+ if (this.bulletinRepo != null) {
+
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller
Service",
+ Severity.ERROR.name(), "Could not enable " +
controllerServiceNode + " due to " + e));
+ }
+
+ break;
}
}
} catch (Exception e) {
@@ -295,6 +309,10 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
}
}
}
+
+ if (firstFailure != null) {
+ throw firstFailure;
+ }
}
@Override
@@ -382,14 +400,18 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
public Future<Void> disableControllerServicesAsync(final
Collection<ControllerServiceNode> serviceNodes) {
final CompletableFuture<Void> future = new CompletableFuture<>();
processScheduler.submitFrameworkTask(() -> {
- disableControllerServices(serviceNodes, future);
- future.complete(null);
+ try {
+ disableControllerServices(serviceNodes, future);
+ future.complete(null);
+ } catch (final Exception e) {
+ future.completeExceptionally(e);
+ }
});
return future;
}
- private void disableControllerServices(final
Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void>
future) {
+ private void disableControllerServices(final
Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void>
future) throws Exception {
final Set<ControllerServiceNode> serviceNodeSet = new
HashSet<>(serviceNodes);
// Verify that for each Controller Service given, any service that
references it is either disabled or is also in the given collection
@@ -406,24 +428,16 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
}
}
+ Exception firstFailure = null;
for (final ControllerServiceNode serviceNode : serviceNodes) {
if (serviceNode.isActive()) {
- disableReferencingServices(serviceNode);
- final CompletableFuture<?> serviceFuture =
disableControllerService(serviceNode);
-
- while (true) {
- try {
- serviceFuture.get(1, TimeUnit.SECONDS);
- break;
- } catch (final TimeoutException e) {
- if (future.isCancelled()) {
- return;
- }
-
- continue;
- } catch (final Exception e) {
- logger.error("Failed to disable {}", serviceNode, e);
- future.completeExceptionally(e);
+ try {
+
disableControllerServiceAndReferencingServices(serviceNode,
future::isCancelled);
+ } catch (final Exception e) {
+ if (firstFailure == null) {
+ firstFailure = e;
+ } else {
+ firstFailure.addSuppressed(e);
}
}
} else {
@@ -438,6 +452,26 @@ public class StandardControllerServiceProvider implements
ControllerServiceProvi
}
}
}
+
+ if (firstFailure != null) {
+ throw firstFailure;
+ }
+ }
+
+ private void disableControllerServiceAndReferencingServices(final
ControllerServiceNode serviceNode, final BooleanSupplier cancelSupplier) throws
ExecutionException, InterruptedException {
+ disableReferencingServices(serviceNode);
+ final CompletableFuture<?> serviceFuture =
disableControllerService(serviceNode);
+
+ while (true) {
+ try {
+ serviceFuture.get(1, TimeUnit.SECONDS);
+ break;
+ } catch (final TimeoutException e) {
+ if (cancelSupplier.getAsBoolean()) {
+ return;
+ }
+ }
+ }
}
@Override
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java
new file mode 100644
index 0000000000..59df1c404f
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LifecycleFailureService extends AbstractControllerService {
+ static final PropertyDescriptor ENABLE_FAILURE_COUNT = new
PropertyDescriptor.Builder()
+ .name("Enable Failure Count")
+ .description("How many times the CS should fail to enable before
succeeding")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .build();
+
+ static final PropertyDescriptor FAIL_ON_DISABLE = new
PropertyDescriptor.Builder()
+ .name("Fail on Disable")
+ .displayName("Fail on Disable")
+ .description("Whether or not hte Controller Service should fail when
disabled")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ private final AtomicInteger invocationCount = new AtomicInteger(0);
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Arrays.asList(ENABLE_FAILURE_COUNT, FAIL_ON_DISABLE);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final int maxFailureCount =
context.getProperty(ENABLE_FAILURE_COUNT).asInteger();
+ final int currentInvocationCount = invocationCount.getAndIncrement();
+ if (currentInvocationCount >= maxFailureCount) {
+ getLogger().info("Enabling successfully because invocation count
is {}", currentInvocationCount);
+ return;
+ }
+
+ getLogger().info("Will fail to enable because invocation count is {}",
currentInvocationCount);
+ throw new RuntimeException("Failing to enable because configured to
fail " + maxFailureCount + " times and current failure count is only " +
currentInvocationCount);
+ }
+
+ @OnDisabled
+ public void onDisabled(final ConfigurationContext context) {
+ if (context.getProperty(FAIL_ON_DISABLE).asBoolean()) {
+ getLogger().info("Throwing Exception in onDisabled as configured");
+ throw new RuntimeException("Failing to disable because configured
to fail on disable");
+ }
+
+ getLogger().info("Completing onDisabled successfully as configured");
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 10f6db7935..01e184617a 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,6 +15,7 @@
org.apache.nifi.cs.tests.system.EnsureControllerServiceConfigurationCorrect
org.apache.nifi.cs.tests.system.FakeControllerService1
+org.apache.nifi.cs.tests.system.LifecycleFailureService
org.apache.nifi.cs.tests.system.SensitiveDynamicPropertiesService
org.apache.nifi.cs.tests.system.StandardCountService
org.apache.nifi.cs.tests.system.StandardSleepService
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 15512cc694..38e8a83fce 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -550,6 +550,20 @@ public class NiFiClientUtil {
return
nifiClient.getControllerServicesClient().updateControllerService(entity);
}
+ public ActivateControllerServicesEntity enableControllerServices(final
String groupId, final boolean waitForEnabled) throws NiFiClientException,
IOException {
+ final ActivateControllerServicesEntity
activateControllerServicesEntity = new ActivateControllerServicesEntity();
+ activateControllerServicesEntity.setId(groupId);
+
activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_ENABLED);
+ activateControllerServicesEntity.setDisconnectedNodeAcknowledged(true);
+
+ final ActivateControllerServicesEntity activateControllerServices =
nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
+ if (waitForEnabled) {
+ waitForControllerSerivcesEnabled(groupId);
+ }
+
+ return activateControllerServices;
+ }
+
public ControllerServiceEntity enableControllerService(final
ControllerServiceEntity entity) throws NiFiClientException, IOException {
final ControllerServiceRunStatusEntity runStatusEntity = new
ControllerServiceRunStatusEntity();
runStatusEntity.setState("ENABLED");
@@ -732,6 +746,10 @@ public class NiFiClientUtil {
waitForControllerServiceState(groupId, "ENABLED",
Arrays.asList(serviceIdsOfInterest));
}
+ public void waitForControllerSerivcesEnabled(final String groupId, final
List<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
+ waitForControllerServiceState(groupId, "ENABLED",
serviceIdsOfInterest);
+ }
+
public void waitForControllerServiceState(final String groupId, final
String desiredState, final Collection<String> serviceIdsOfInterest) throws
NiFiClientException, IOException {
while (true) {
final List<ControllerServiceEntity> nonDisabledServices =
getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index faf1409b9f..ba8a606a6f 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -157,7 +157,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
final FlowFileEntity flowFile =
getClientUtil().getQueueFlowFile(connection.getId(), i);
assertEquals("updated",
flowFile.getFlowFile().getAttributes().get("attr"));
}
-
}
@Test
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
new file mode 100644
index 0000000000..6295f6ee24
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.controllerservice;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ControllerServiceLifecycleIT extends NiFiSystemIT {
+ @Test
+ public void testControllerServiceFailingToEnableAllowsOthersToEnable()
throws NiFiClientException, IOException {
+ for (int i=0; i < 12; i++) {
+ ControllerServiceEntity failureService =
getClientUtil().createControllerService("LifecycleFailureService");
+ getClientUtil().updateControllerServiceProperties(failureService,
Collections.singletonMap("Enable Failure Count", "1000"));
+ }
+
+ final List<String> countServiceIds = new ArrayList<>();
+ for (int i=0; i < 12; i++) {
+ ControllerServiceEntity countService =
getClientUtil().createControllerService("StandardCountService");
+ countServiceIds.add(countService.getId());
+ }
+
+ getClientUtil().enableControllerServices("root", false);
+ getClientUtil().waitForControllerSerivcesEnabled("root",
countServiceIds);
+ }
+
+ @Test
+ public void testControllerServiceEnableFailureCausesRetry() throws
NiFiClientException, IOException {
+ ControllerServiceEntity service =
getClientUtil().createControllerService("LifecycleFailureService");
+ getClientUtil().updateControllerServiceProperties(service,
Collections.singletonMap("Enable Failure Count", "1"));
+
+ getClientUtil().enableControllerServices("root", false);
+ getClientUtil().waitForControllerSerivcesEnabled("root");
+ }
+
+}