markap14 commented on code in PR #11079:
URL: https://github.com/apache/nifi/pull/11079#discussion_r3023935120
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java:
##########
@@ -47,4 +49,14 @@ default ConnectorConfigurationProvider
getConnectorConfigurationProvider() {
return null;
}
+ /**
+ * Returns the maximum time to wait for a connector in a transient state
(STARTING, STOPPING, PURGING)
+ * to reach a stable state during flow synchronization.
+ *
+ * @return the sync timeout duration, defaults to 15 minutes
+ */
+ default Duration getConnectorSyncTimeout() {
+ return Duration.ofMinutes(15);
Review Comment:
This feels like a rather excessive default, no?
##########
nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorSyncDirective.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.flow.ScheduledState;
+
+/**
+ * Directive returned by {@link
ConnectorConfigurationProvider#verifySyncable(String, ScheduledState)}
+ * indicating how the connector repository should handle synchronization for a
connector during
+ * flow inheritance.
+ */
+public class ConnectorSyncDirective {
+
+ /**
+ * The action the connector repository should take for this connector
during flow sync.
+ */
+ public enum Action {
+ /**
+ * Proceed with synchronization. The directive may optionally include a
+ * {@link ScheduledState} override and/or a {@link
ConnectorWorkingConfiguration}
+ * containing the provider's working config and name.
+ */
+ ALLOW,
+
+ /**
+ * Do not synchronize this connector. The connector should be created
locally
+ * (if not already present) and marked invalid so that a background
repair
+ * process can attempt synchronization later when conditions improve.
+ */
+ REJECT,
+
+ /**
+ * This connector should not exist on this node. If it exists locally,
remove
+ * it from the repository. If it does not exist, do not create it.
This is used
+ * when the external system indicates the connector is being deleted
or has been deleted.
+ */
+ REMOVE
+ }
+
+ private static final ConnectorSyncDirective ALLOW_DEFAULT = new
ConnectorSyncDirective(Action.ALLOW, null, null);
+ private static final ConnectorSyncDirective REJECT_DIRECTIVE = new
ConnectorSyncDirective(Action.REJECT, null, null);
+ private static final ConnectorSyncDirective REMOVE_DIRECTIVE = new
ConnectorSyncDirective(Action.REMOVE, null, null);
+
+ private final Action action;
+ private final ScheduledState scheduledStateOverride;
+ private final ConnectorWorkingConfiguration workingConfiguration;
+
+ private ConnectorSyncDirective(final Action action, final ScheduledState
scheduledStateOverride,
+ final ConnectorWorkingConfiguration
workingConfiguration) {
+ this.action = action;
+ this.scheduledStateOverride = scheduledStateOverride;
+ this.workingConfiguration = workingConfiguration;
+ }
+
+ /**
+ * Returns an ALLOW directive with no overrides. The connector repository
will use the
+ * versioned flow's name, working config, and ScheduledState as-is. This
is the default
+ * behavior when no {@link ConnectorConfigurationProvider} is configured
(Apache NiFi).
+ */
+ public static ConnectorSyncDirective allow() {
+ return ALLOW_DEFAULT;
+ }
+
+ /**
+ * Returns an ALLOW directive with the provider's working configuration
(name + working
+ * config steps) and no ScheduledState override.
+ *
+ * @param workingConfiguration the provider's working configuration
including name
+ */
+ public static ConnectorSyncDirective allow(final
ConnectorWorkingConfiguration workingConfiguration) {
+ return new ConnectorSyncDirective(Action.ALLOW, null,
workingConfiguration);
+ }
+
+ /**
+ * Returns an ALLOW directive with the provider's working configuration
and a
+ * ScheduledState override. The override replaces the versioned flow's
ScheduledState,
+ * which may be stale due to in-flight DPS tasks.
+ *
+ * @param workingConfiguration the provider's working configuration
including name
+ * @param scheduledStateOverride the ScheduledState to use instead of the
versioned flow's value
+ */
+ public static ConnectorSyncDirective allow(final
ConnectorWorkingConfiguration workingConfiguration,
+ final ScheduledState
scheduledStateOverride) {
+ return new ConnectorSyncDirective(Action.ALLOW,
scheduledStateOverride, workingConfiguration);
+ }
+
+ /**
+ * Returns a REJECT directive. The connector will be created locally (if
not already
+ * present) and marked invalid for background repair.
+ */
+ public static ConnectorSyncDirective reject() {
+ return REJECT_DIRECTIVE;
+ }
+
+ /**
+ * Returns a REMOVE directive. The connector should not exist on this node.
+ */
+ public static ConnectorSyncDirective remove() {
+ return REMOVE_DIRECTIVE;
+ }
+
+ public Action getAction() {
+ return action;
+ }
+
+ /**
+ * Returns the ScheduledState override, or {@code null} if the versioned
flow's
+ * ScheduledState should be used.
+ */
+ public ScheduledState getScheduledStateOverride() {
+ return scheduledStateOverride;
+ }
+
+ /**
+ * Returns the provider's working configuration (name + working config
steps),
+ * or {@code null} if the versioned flow's values should be used.
+ */
+ public ConnectorWorkingConfiguration getWorkingConfiguration() {
+ return workingConfiguration;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new
StringBuilder("ConnectorSyncDirective[action=").append(action);
+ if (scheduledStateOverride != null) {
+ sb.append(",
scheduledStateOverride=").append(scheduledStateOverride);
+ }
+ if (workingConfiguration != null) {
+ sb.append(", hasWorkingConfig=true");
+ }
+ sb.append(']');
+ return sb.toString();
Review Comment:
No need for a StringBuilder here
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -91,6 +107,288 @@ public void restoreConnector(final ConnectorNode
connector) {
logger.debug("Successfully restored {}", connector);
}
+ @Override
+ public ConnectorSyncResult syncConnector(final VersionedConnector
versionedConnector) {
+ final String connectorId = versionedConnector.getInstanceIdentifier();
+ final ScheduledState proposedScheduledState =
versionedConnector.getScheduledState();
+ logger.debug("syncConnector called for connector [{}]", connectorId);
+
+ // Consult the provider for external state checks and working config
+ final ConnectorSyncDirective directive;
+ if (configurationProvider != null) {
+ try {
+ directive = configurationProvider.verifySyncable(connectorId,
proposedScheduledState);
+ } catch (final Exception e) {
+ logger.error("Configuration provider threw exception during
verifySyncable for connector [{}]: {}", connectorId, e.getMessage(), e);
+ final ConnectorNode existingNode =
ensureConnectorNodeExists(versionedConnector);
+ existingNode.markInvalid("Flow Synchronization Failure",
+ "Configuration provider error during synchronization:
" + e.getMessage());
+ return ConnectorSyncResult.FAILED;
+ }
+ } else {
+ directive = ConnectorSyncDirective.allow();
+ }
+
+ logger.debug("Connector [{}] sync directive: {}", connectorId,
directive);
+
+ // Handle REMOVE: connector should not exist on this node
+ if (directive.getAction() == ConnectorSyncDirective.Action.REMOVE) {
+ final ConnectorNode existingNode = connectors.remove(connectorId);
Review Comment:
I don't think the removal logic here is sufficient. We need to ensure that
the Connector gets stopped and then purge its data, cleanup Class Loaders, call
`@OnRemoved` if necessary, etc.
##########
nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java:
##########
@@ -99,6 +101,35 @@ public interface ConnectorConfigurationProvider {
*/
void verifyCreate(String connectorId);
+ /**
+ * Determines how the connector repository should handle synchronization
for the given
+ * connector during flow inheritance (cluster join). The provider examines
the external
+ * state of the connector and returns a {@link ConnectorSyncDirective}
indicating whether
+ * to allow, reject, or remove the connector.
+ *
+ * <p>When the directive action is {@link
ConnectorSyncDirective.Action#ALLOW}, the
+ * directive may optionally include:</p>
+ * <ul>
+ * <li>A {@link ConnectorWorkingConfiguration} with the provider's
working config and name
+ * (overriding the potentially stale values from the versioned
flow)</li>
+ * <li>A {@link ScheduledState} override (correcting stale run intent
from the versioned flow)</li>
+ * </ul>
+ *
+ * <p>This method combines the verify and load operations into a single
call to avoid
+ * redundant round-trips to the external store.</p>
+ *
+ * <p>The default implementation returns {@link
ConnectorSyncDirective#allow()} with no
+ * overrides, meaning the versioned flow's values are used for everything.
This is the
+ * behavior for Apache NiFi when no provider is configured.</p>
+ *
+ * @param connectorId the identifier of the connector to check
+ * @param proposedScheduledState the ScheduledState from the versioned flow
+ * @return a directive indicating how to handle this connector during sync
+ */
+ default ConnectorSyncDirective verifySyncable(final String connectorId,
final ScheduledState proposedScheduledState) {
Review Comment:
Not a huge deal, but `verify*` methods in the framework generally are named
`verifyCan*`. E.g., `verifyCanSync` - and also typically are `void` method that
throw an `IllegalStateException` if not allowed. Perhaps consider another
naming convention, such as `determineSyncStatus`, `determineSyncEligibility`,
`getSyncDirective`?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -39,34 +45,44 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public class StandardConnectorRepository implements ConnectorRepository {
private static final Logger logger =
LoggerFactory.getLogger(StandardConnectorRepository.class);
+ private static final Duration DEFAULT_SYNC_TIMEOUT =
Duration.ofMinutes(15);
+ private static final Duration SYNC_POLL_INTERVAL = Duration.ofSeconds(2);
private final Map<String, ConnectorNode> connectors = new
ConcurrentHashMap<>();
private final FlowEngine lifecycleExecutor = new FlowEngine(8, "NiFi
Connector Lifecycle");
+ private volatile FlowManager flowManager;
private volatile ExtensionManager extensionManager;
private volatile ConnectorRequestReplicator requestReplicator;
private volatile SecretsManager secretsManager;
private volatile AssetManager assetManager;
private volatile ConnectorConfigurationProvider configurationProvider;
+ private volatile Duration syncTimeout = DEFAULT_SYNC_TIMEOUT;
Review Comment:
Seems unnecessary to initialize this to a default value (and to even define
the default value) if the value will always be set in the initialization, no?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorSyncResult.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+/**
+ * Result of a connector synchronization attempt during flow inheritance.
+ */
+public enum ConnectorSyncResult {
+
+ /**
+ * Configuration was applied and the connector's run state was updated
+ * to match the proposed ScheduledState.
+ */
+ SYNCED,
+
+ /**
+ * Configuration was already up to date; the connector's run state was
+ * updated if it differed from the proposed ScheduledState.
+ */
+ SYNCED_NO_CHANGES,
Review Comment:
Seems weird to refer to this as `SYNCED_NO_CHANGES` if the Connector's Run
State is updated. Perhaps `SYNCED_CONFIG_UNCHANGED` makes more sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]