markap14 commented on code in PR #10654:
URL: https://github.com/apache/nifi/pull/10654#discussion_r2627479672
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -102,60 +103,91 @@ public Future<Void> stopConnector(final ConnectorNode
connector) {
}
@Override
- public void applyUpdate(final ConnectorNode connector) throws
FlowUpdateException {
+ public void applyUpdate(final ConnectorNode connector, final
ConnectorUpdateContext context) throws FlowUpdateException {
final ConnectorState initialDesiredState = connector.getDesiredState();
- // Perform whatever preparation is necessary for the update. Default
implementation is to stop the connector.
- connector.prepareForUpdate();
+ // Transition the connector's state to PREPARING_FOR_UPDATE before
starting the background process.
+ // This allows us to ensure that if we poll and see the state in the
same state it was in before that
+ // we know the update has already completed (successfully or
otherwise).
+ connector.transitionStateForUpdating();
- try {
- // Wait for Connector State to become UPDATING
- waitForState(connector, ConnectorState.UPDATING,
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+ // Update connector in a background thread. This will handle
transitioning the Connector state appropriately
+ // so that it's clear when the update has completed.
+ lifecycleExecutor.submit(() -> updateConnector(connector,
initialDesiredState, context));
+ }
- // Apply the update to the connector.
- connector.applyUpdate();
+ private void updateConnector(final ConnectorNode connector, final
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
+ try {
+ // Perform whatever preparation is necessary for the update.
Default implementation is to stop the connector.
+ connector.prepareForUpdate();
+
+ try {
+ // Wait for Connector State to become UPDATING
+ waitForState(connector, Set.of(ConnectorState.UPDATING),
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
+
+ // Apply the update to the connector.
+ connector.applyUpdate();
+
+ // Now that the update has been applied, save the flow so that
the updated configuration is persisted.
+ context.saveFlow();
+
+ // Wait for Connector State to become UPDATED, or to revert to
the initial desired state because, depending upon timing,
+ // other nodes may have already seen the transition to UPDATED
and moved the connector back to the initial desired state.
+ final Set<ConnectorState> desirableStates = new HashSet<>();
+ desirableStates.add(initialDesiredState);
+ desirableStates.add(ConnectorState.UPDATED);
+ if (initialDesiredState == ConnectorState.RUNNING) {
+ desirableStates.add(ConnectorState.STARTING);
+ } else if (initialDesiredState == ConnectorState.STOPPED) {
+ desirableStates.add(ConnectorState.STOPPING);
+ }
+ waitForState(connector, desirableStates,
Set.of(ConnectorState.UPDATING));
- // Wait for Connector State to become UPDATED
- waitForState(connector, ConnectorState.UPDATED,
Set.of(ConnectorState.UPDATING));
+ // If the initial desired state was RUNNING, start the
connector again. Otherwise, stop it.
+ // We don't simply leave it be as the prepareForUpdate /
update may have changed the state of some components.
+ if (initialDesiredState == ConnectorState.RUNNING) {
+ connector.start(lifecycleExecutor);
+ } else {
+ connector.stop(lifecycleExecutor);
+ }
- // If the initial desired state was RUNNING, start the connector
again. Otherwise, stop it.
- // We don't simply leave it be as the prepareForUpdate / update
may have changed the state of some components.
- if (initialDesiredState == ConnectorState.RUNNING) {
- connector.start(lifecycleExecutor);
- } else {
- connector.stop(lifecycleExecutor);
+ // We've updated the state of the connector so save flow again
+ context.saveFlow();
+ } catch (final Exception e) {
+ logger.error("Failed to apply connector update for {}",
connector, e);
+ connector.abortUpdate(e);
}
} catch (final Exception e) {
- connector.abortUpdate(e);
+ logger.error("Failed to apply connector update for {}", connector,
e);
Review Comment:
No, you're right. We should abort in this case also. Will fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]