kevdoran commented on code in PR #11079:
URL: https://github.com/apache/nifi/pull/11079#discussion_r3025758999


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java:
##########
@@ -1042,36 +1042,54 @@ private Set<String> getAssetIds(final Parameter 
parameter) {
     }
 
     private void inheritConnectors(final FlowController flowController, final 
VersionedDataflow dataflow) {
-        // TODO: We need to delete any Connectors that are no longer part of 
the flow.
-        //       This means we need to drain the Connector first, then stop 
it, then delete it. If unable to drain, we must fail...
-        //              perhaps we need a DRAINING state? Or do we just delete 
it and drop the data?
         final ConnectorRepository connectorRepository = 
flowController.getConnectorRepository();
 
         final Set<String> proposedConnectorIds = new HashSet<>();
         if (dataflow.getConnectors() != null) {
             for (final VersionedConnector versionedConnector : 
dataflow.getConnectors()) {
                 
proposedConnectorIds.add(versionedConnector.getInstanceIdentifier());
 
-                final ConnectorNode existingConnector = 
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
-                if (existingConnector == null) {
-                    logger.info("Connector {} of type {} with name {} is not 
in the current flow. Will add Connector.",
-                        versionedConnector.getInstanceIdentifier(), 
versionedConnector.getType(), versionedConnector.getName());
+                // Ensure the connector exists locally before any provider 
interaction so that
+                // if verifySyncable or any subsequent step fails, we have a 
local node to mark invalid.
+                ConnectorNode connectorNode = 
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
+                final boolean isNewConnector = connectorNode == null;
+                if (isNewConnector) {
+                    final BundleCoordinate coordinate = 
createBundleCoordinate(extensionManager, versionedConnector.getBundle(), 
versionedConnector.getType());
+                    connectorNode = 
flowController.getFlowManager().createConnector(
+                        versionedConnector.getType(), 
versionedConnector.getInstanceIdentifier(), coordinate, false, true);
+                    connectorRepository.restoreConnector(connectorNode);
+                }
 
-                    addConnector(versionedConnector, connectorRepository, 
flowController.getFlowManager());
-                } else if (isConnectorConfigurationUpdated(existingConnector, 
versionedConnector)) {
-                    logger.info("{} configuration has changed, updating 
configuration", existingConnector);
-                    updateConnector(versionedConnector, connectorRepository);
-                } else {
-                    logger.debug("{} configuration is up to date, no update 
necessary", existingConnector);
+                try {
+                    
connectorRepository.verifySyncable(versionedConnector.getInstanceIdentifier());

Review Comment:
   OBE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to