This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8311461ec2 NIFI-15170 Enabled Cluster State Provider on Flow
Controller creation (#10487)
8311461ec2 is described below
commit 8311461ec2aab009deba67152cfe3be1559e157d
Author: Mark Payne <[email protected]>
AuthorDate: Sat Nov 1 17:31:11 2025 -0400
NIFI-15170 Enabled Cluster State Provider on Flow Controller creation
(#10487)
- Updated FlowController to set the State Provider's cluster enabled flag
to true in the FlowController constructor instead of waiting for the connection
to cluster to complete
Signed-off-by: David Handermann <[email protected]>
---
.../org/apache/nifi/controller/FlowController.java | 7 +-
.../nifi/cs/tests/system/SetStateService.java | 29 ++++++
.../system/VerifyLocalClusterStateService.java | 78 ++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 3 +-
.../nifi/processors/tests/system/SetState.java | 74 +++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../ControllerServiceStateIT.java | 103 +++++++++++++++++++++
7 files changed, 291 insertions(+), 4 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8607578257..e564d29407 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -506,6 +506,10 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
this.statusHistoryRepository = statusHistoryRepository;
this.stateManagerProvider = stateManagerProvider;
+ if (configuredForClustering) {
+ stateManagerProvider.enableClusterProvider();
+ }
+
timerDrivenEngineRef = new AtomicReference<>(new
FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
final FlowFileRepository flowFileRepo =
createFlowFileRepository(nifiProperties, extensionManager,
resourceClaimManager);
@@ -2750,13 +2754,10 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
if (clustered) {
onClusterConnect();
leaderElectionManager.start();
- stateManagerProvider.enableClusterProvider();
-
loadBalanceClientRegistry.start();
heartbeat();
} else {
- stateManagerProvider.disableClusterProvider();
setPrimary(false);
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services-api/src/main/java/org/apache/nifi/cs/tests/system/SetStateService.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services-api/src/main/java/org/apache/nifi/cs/tests/system/SetStateService.java
new file mode 100644
index 0000000000..d48661a7af
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services-api/src/main/java/org/apache/nifi/cs/tests/system/SetStateService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.controller.ControllerService;
+
+import java.io.IOException;
+
+public interface SetStateService extends ControllerService {
+
+ void setState(String key, String value, Scope scope) throws IOException;
+
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/VerifyLocalClusterStateService.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/VerifyLocalClusterStateService.java
new file mode 100644
index 0000000000..3780bb61aa
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/VerifyLocalClusterStateService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class VerifyLocalClusterStateService extends AbstractControllerService
implements SetStateService {
+
+ static final PropertyDescriptor FILE_STORAGE_LOCATION = new
PropertyDescriptor.Builder()
+ .name("File Storage Location")
+ .description("The file system location where state files will be
written.")
+ .addValidator(StandardValidators.createDirectoryExistsValidator(false,
true))
+ .required(true)
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(FILE_STORAGE_LOCATION);
+ }
+
+ @OnEnabled
+ public void onScheduled(final ConfigurationContext context) throws
IOException {
+ final String nodeNumber = System.getProperty("nodeNumber");
+ final StateManager stateManager = getStateManager();
+ final Map<String, String> clusterState =
stateManager.getState(Scope.CLUSTER).toMap();
+ final Map<String, String> localState =
stateManager.getState(Scope.LOCAL).toMap();
+
+ final File directory = new
File(context.getProperty(FILE_STORAGE_LOCATION).getValue());
+
+ writeToPropertiesFile(clusterState, new File(directory,
"cluster-state-" + nodeNumber + ".properties"));
+ writeToPropertiesFile(localState, new File(directory, "local-state-" +
nodeNumber + ".properties"));
+ }
+
+ private void writeToPropertiesFile(final Map<String, String> stateMap,
final File file) throws IOException {
+ final Properties properties = new Properties();
+ stateMap.forEach(properties::setProperty);
+ try (final OutputStream out = new FileOutputStream(file)) {
+ properties.store(out, "State Map");
+ }
+ }
+
+ @Override
+ public void setState(final String key, final String value, final Scope
scope) throws IOException {
+ final Map<String, String> state = new
HashMap<>(getStateManager().getState(scope).toMap());
+ state.put(key, value);
+ getStateManager().setState(state, scope);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 1186787f7b..2983552663 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -22,4 +22,5 @@ org.apache.nifi.cs.tests.system.StandardCountService
org.apache.nifi.cs.tests.system.StandardSleepService
org.apache.nifi.cs.tests.system.MigrationService
org.apache.nifi.cs.tests.system.MockCSVReader
-org.apache.nifi.cs.tests.system.MockCSVWriter
\ No newline at end of file
+org.apache.nifi.cs.tests.system.MockCSVWriter
+org.apache.nifi.cs.tests.system.VerifyLocalClusterStateService
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SetState.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SetState.java
new file mode 100644
index 0000000000..e9356266e4
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SetState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.cs.tests.system.SetStateService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Set;
+
+public class SetState extends AbstractProcessor {
+
+ static final PropertyDescriptor STATE_SERVICE = new Builder()
+ .name("State Service")
+ .identifiesControllerService(SetStateService.class)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .build();
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(STATE_SERVICE);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final SetStateService service =
context.getProperty(STATE_SERVICE).asControllerService(SetStateService.class);
+ try {
+ service.setState("local", "local", Scope.LOCAL);
+ service.setState("cluster", "cluster", Scope.CLUSTER);
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9b6ba1cbd5..d0a01b38b5 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -45,6 +45,7 @@ org.apache.nifi.processors.tests.system.ReverseContents
org.apache.nifi.processors.tests.system.RoundRobinFlowFiles
org.apache.nifi.processors.tests.system.SensitiveDynamicPropertiesProcessor
org.apache.nifi.processors.tests.system.SetAttribute
+org.apache.nifi.processors.tests.system.SetState
org.apache.nifi.processors.tests.system.Sleep
org.apache.nifi.processors.tests.system.SplitByLine
org.apache.nifi.processors.tests.system.SplitTextByLine
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceStateIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceStateIT.java
new file mode 100644
index 0000000000..ac00efcbe8
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceStateIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.controllerservice;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ControllerServiceStateIT extends NiFiSystemIT {
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createTwoNodeInstanceFactory();
+ }
+
+ @Test
+ public void testLocalClusterState() throws NiFiClientException,
IOException, InterruptedException {
+ final String tempDir = System.getProperty("java.io.tmpdir");
+
+ final File storageDir = new File(tempDir);
+ final File localStateFile1 = new File(storageDir,
"local-state-1.properties");
+ final File clusterStateFile1 = new File(storageDir,
"cluster-state-1.properties");
+ final File localStateFile2 = new File(storageDir,
"local-state-2.properties");
+ final File clusterStateFile2 = new File(storageDir,
"cluster-state-2.properties");
+ localStateFile1.delete();
+ clusterStateFile1.delete();
+ localStateFile2.delete();
+ clusterStateFile2.delete();
+
+ final ControllerServiceEntity failureService =
getClientUtil().createControllerService("VerifyLocalClusterStateService");
+ getClientUtil().updateControllerServiceProperties(failureService,
Map.of("File Storage Location", tempDir));
+ getClientUtil().enableControllerService(failureService);
+
+ final ProcessorEntity setState =
getClientUtil().createProcessor("SetState");
+ getClientUtil().updateProcessorProperties(setState, Map.of("State
Service", failureService.getId()));
+ getClientUtil().setAutoTerminatedRelationships(setState, "success");
+ getClientUtil().waitForValidProcessor(setState.getId());
+
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ final ConnectionEntity connection =
getClientUtil().createConnection(generate, setState, "success");
+
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(connection, getNumberOfNodes());
+
+ getClientUtil().startProcessor(setState);
+ waitForQueueCount(connection, 0);
+
+ getNiFiInstance().stop();
+ getNiFiInstance().start(true);
+
+ waitFor(() -> localStateFile1.exists() && clusterStateFile1.exists());
+
+ final Properties localProperties1 =
loadPropertiesFromFile(localStateFile1);
+ final Properties clusterProperties1 =
loadPropertiesFromFile(clusterStateFile1);
+ final Properties localProperties2 =
loadPropertiesFromFile(localStateFile2);
+ final Properties clusterProperties2 =
loadPropertiesFromFile(clusterStateFile2);
+
+ final Properties expectedLocalProperties = new Properties();
+ expectedLocalProperties.setProperty("local", "local");
+ final Properties expectedClusterProperties = new Properties();
+ expectedClusterProperties.setProperty("cluster", "cluster");
+
+ assertEquals(expectedLocalProperties, localProperties1);
+ assertEquals(expectedLocalProperties, localProperties2);
+ assertEquals(expectedClusterProperties, clusterProperties1);
+ assertEquals(expectedClusterProperties, clusterProperties2);
+ }
+
+ private Properties loadPropertiesFromFile(final File file) throws
IOException {
+ final Properties properties = new Properties();
+ try (final FileInputStream in = new FileInputStream(file)) {
+ properties.load(in);
+ }
+ return properties;
+ }
+}