pvillard31 commented on code in PR #11201:
URL: https://github.com/apache/nifi/pull/11201#discussion_r3187535096
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -745,17 +745,43 @@ private void synchronizeControllerServices(final
ProcessGroup group, final Versi
updateControllerService(addedService, proposedService,
topLevelGroup);
}
- // Update all of the Controller Services to match the
VersionedControllerService
+ // Update all Controller Services to match the
VersionedControllerService.
+ // Services may be ENABLED here if the outer "affected components"
pass did not
+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by
AffectedComponentSet).
+ // We must disable before calling updateControllerService, which calls
setProperties
+ // which calls verifyModifiable and throws IllegalStateException on
ENABLED services.
+ final long stopTimeout = System.currentTimeMillis() +
syncOptions.getComponentStopTimeout().toMillis();
Review Comment:
Should this deadline be recomputed per service inside the loop, like
`processorStopDeadline` in `synchronizeProcessors`, so later iterations don't
inherit an exhausted budget?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -745,17 +745,43 @@ private void synchronizeControllerServices(final
ProcessGroup group, final Versi
updateControllerService(addedService, proposedService,
topLevelGroup);
}
- // Update all of the Controller Services to match the
VersionedControllerService
+ // Update all Controller Services to match the
VersionedControllerService.
+ // Services may be ENABLED here if the outer "affected components"
pass did not
+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by
AffectedComponentSet).
+ // We must disable before calling updateControllerService, which calls
setProperties
+ // which calls verifyModifiable and throws IllegalStateException on
ENABLED services.
+ final long stopTimeout = System.currentTimeMillis() +
syncOptions.getComponentStopTimeout().toMillis();
for (final Map.Entry<ControllerServiceNode,
VersionedControllerService> entry : services.entrySet()) {
final ControllerServiceNode service = entry.getKey();
final VersionedControllerService proposedService =
entry.getValue();
if
(updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
- updateControllerService(service, proposedService,
topLevelGroup);
- // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to
be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(service, getPropertyValues(service)));
- LOG.info("Updated {}", service);
+ final Set<ComponentNode> referencesToRestart = new HashSet<>();
+ final Set<ControllerServiceNode> servicesToRestart = new
HashSet<>();
+
+ try {
+ try {
+ stopControllerService(service, proposedService,
stopTimeout,
+ syncOptions.getComponentStopTimeoutAction(),
+ referencesToRestart, servicesToRestart,
syncOptions);
+ } catch (final TimeoutException e) {
+ throw new FlowSynchronizationException("Failed to stop
Controller Service " + service + " in preparation for update", e);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new FlowSynchronizationException("Interrupted
while stopping Controller Service " + service, e);
+ }
+ updateControllerService(service, proposedService,
topLevelGroup);
+ createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(service, getPropertyValues(service)));
+ LOG.info("Updated {}", service);
+ } finally {
+ // Re-enable services and restart components that were
stopped for the update,
+ // restoring the controller to its pre-update running
state.
+ if (proposedService.getScheduledState() !=
org.apache.nifi.flow.ScheduledState.DISABLED) {
+
context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart);
+
context.getControllerServiceProvider().scheduleReferencingComponents(
Review Comment:
Should we also call `notifyScheduledStateChange` for `servicesToRestart`
(ENABLED) and `referencesToRestart` (RUNNING) here, to match the per-service
`synchronize(ControllerServiceNode, …)` overload?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -745,17 +745,43 @@ private void synchronizeControllerServices(final
ProcessGroup group, final Versi
updateControllerService(addedService, proposedService,
topLevelGroup);
}
- // Update all of the Controller Services to match the
VersionedControllerService
+ // Update all Controller Services to match the
VersionedControllerService.
+ // Services may be ENABLED here if the outer "affected components"
pass did not
+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by
AffectedComponentSet).
Review Comment:
Should this reference `FlowDifferenceFilters.isComponentUpdateRequired`
instead of `AffectedComponentSet`, since that's the filter actually populating
`updatedVersionedComponentIds`?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -745,17 +745,43 @@ private void synchronizeControllerServices(final
ProcessGroup group, final Versi
updateControllerService(addedService, proposedService,
topLevelGroup);
}
- // Update all of the Controller Services to match the
VersionedControllerService
+ // Update all Controller Services to match the
VersionedControllerService.
+ // Services may be ENABLED here if the outer "affected components"
pass did not
+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by
AffectedComponentSet).
+ // We must disable before calling updateControllerService, which calls
setProperties
+ // which calls verifyModifiable and throws IllegalStateException on
ENABLED services.
+ final long stopTimeout = System.currentTimeMillis() +
syncOptions.getComponentStopTimeout().toMillis();
for (final Map.Entry<ControllerServiceNode,
VersionedControllerService> entry : services.entrySet()) {
final ControllerServiceNode service = entry.getKey();
final VersionedControllerService proposedService =
entry.getValue();
if
(updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
- updateControllerService(service, proposedService,
topLevelGroup);
- // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to
be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(service, getPropertyValues(service)));
- LOG.info("Updated {}", service);
+ final Set<ComponentNode> referencesToRestart = new HashSet<>();
+ final Set<ControllerServiceNode> servicesToRestart = new
HashSet<>();
+
+ try {
+ try {
+ stopControllerService(service, proposedService,
stopTimeout,
+ syncOptions.getComponentStopTimeoutAction(),
+ referencesToRestart, servicesToRestart,
syncOptions);
+ } catch (final TimeoutException e) {
+ throw new FlowSynchronizationException("Failed to stop
Controller Service " + service + " in preparation for update", e);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new FlowSynchronizationException("Interrupted
while stopping Controller Service " + service, e);
+ }
+ updateControllerService(service, proposedService,
topLevelGroup);
+ createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(service, getPropertyValues(service)));
+ LOG.info("Updated {}", service);
+ } finally {
+ // Re-enable services and restart components that were
stopped for the update,
+ // restoring the controller to its pre-update running
state.
+ if (proposedService.getScheduledState() !=
org.apache.nifi.flow.ScheduledState.DISABLED) {
+
context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart);
Review Comment:
Does enabling via `controllerServiceProvider` here race with the subsequent
`componentScheduler.enableControllerServicesAsync` loop, and should both paths
use the same scheduler?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]