This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8311461ec2 NIFI-15170 Enabled Cluster State Provider on Flow 
Controller creation (#10487)
8311461ec2 is described below

commit 8311461ec2aab009deba67152cfe3be1559e157d
Author: Mark Payne <[email protected]>
AuthorDate: Sat Nov 1 17:31:11 2025 -0400

    NIFI-15170 Enabled Cluster State Provider on Flow Controller creation 
(#10487)
    
    - Updated FlowController to set the State Provider's cluster enabled flag 
to true in the FlowController constructor instead of waiting for the connection 
to cluster to complete
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../org/apache/nifi/controller/FlowController.java |   7 +-
 .../nifi/cs/tests/system/SetStateService.java      |  29 ++++++
 .../system/VerifyLocalClusterStateService.java     |  78 ++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   3 +-
 .../nifi/processors/tests/system/SetState.java     |  74 +++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../ControllerServiceStateIT.java                  | 103 +++++++++++++++++++++
 7 files changed, 291 insertions(+), 4 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8607578257..e564d29407 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -506,6 +506,10 @@ public class FlowController implements 
ReportingTaskProvider, FlowAnalysisRulePr
         this.statusHistoryRepository = statusHistoryRepository;
         this.stateManagerProvider = stateManagerProvider;
 
+        if (configuredForClustering) {
+            stateManagerProvider.enableClusterProvider();
+        }
+
         timerDrivenEngineRef = new AtomicReference<>(new 
FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
 
         final FlowFileRepository flowFileRepo = 
createFlowFileRepository(nifiProperties, extensionManager, 
resourceClaimManager);
@@ -2750,13 +2754,10 @@ public class FlowController implements 
ReportingTaskProvider, FlowAnalysisRulePr
                 if (clustered) {
                     onClusterConnect();
                     leaderElectionManager.start();
-                    stateManagerProvider.enableClusterProvider();
-
                     loadBalanceClientRegistry.start();
 
                     heartbeat();
                 } else {
-                    stateManagerProvider.disableClusterProvider();
                     setPrimary(false);
                 }
 
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services-api/src/main/java/org/apache/nifi/cs/tests/system/SetStateService.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services-api/src/main/java/org/apache/nifi/cs/tests/system/SetStateService.java
new file mode 100644
index 0000000000..d48661a7af
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services-api/src/main/java/org/apache/nifi/cs/tests/system/SetStateService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.controller.ControllerService;
+
+import java.io.IOException;
+
+public interface SetStateService extends ControllerService {
+
+    void setState(String key, String value, Scope scope) throws IOException;
+
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/VerifyLocalClusterStateService.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/VerifyLocalClusterStateService.java
new file mode 100644
index 0000000000..3780bb61aa
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/VerifyLocalClusterStateService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class VerifyLocalClusterStateService extends AbstractControllerService 
implements SetStateService {
+
+    static final PropertyDescriptor FILE_STORAGE_LOCATION = new 
PropertyDescriptor.Builder()
+        .name("File Storage Location")
+        .description("The file system location where state files will be 
written.")
+        .addValidator(StandardValidators.createDirectoryExistsValidator(false, 
true))
+        .required(true)
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return List.of(FILE_STORAGE_LOCATION);
+    }
+
+    @OnEnabled
+    public void onScheduled(final ConfigurationContext context) throws 
IOException {
+        final String nodeNumber = System.getProperty("nodeNumber");
+        final StateManager stateManager = getStateManager();
+        final Map<String, String> clusterState = 
stateManager.getState(Scope.CLUSTER).toMap();
+        final Map<String, String> localState = 
stateManager.getState(Scope.LOCAL).toMap();
+
+        final File directory = new 
File(context.getProperty(FILE_STORAGE_LOCATION).getValue());
+
+        writeToPropertiesFile(clusterState, new File(directory, 
"cluster-state-" + nodeNumber + ".properties"));
+        writeToPropertiesFile(localState, new File(directory, "local-state-" + 
nodeNumber + ".properties"));
+    }
+
+    private void writeToPropertiesFile(final Map<String, String> stateMap, 
final File file) throws IOException {
+        final Properties properties = new Properties();
+        stateMap.forEach(properties::setProperty);
+        try (final OutputStream out = new FileOutputStream(file)) {
+            properties.store(out, "State Map");
+        }
+    }
+
+    @Override
+    public void setState(final String key, final String value, final Scope 
scope) throws IOException {
+        final Map<String, String> state = new 
HashMap<>(getStateManager().getState(scope).toMap());
+        state.put(key, value);
+        getStateManager().setState(state, scope);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 1186787f7b..2983552663 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -22,4 +22,5 @@ org.apache.nifi.cs.tests.system.StandardCountService
 org.apache.nifi.cs.tests.system.StandardSleepService
 org.apache.nifi.cs.tests.system.MigrationService
 org.apache.nifi.cs.tests.system.MockCSVReader
-org.apache.nifi.cs.tests.system.MockCSVWriter
\ No newline at end of file
+org.apache.nifi.cs.tests.system.MockCSVWriter
+org.apache.nifi.cs.tests.system.VerifyLocalClusterStateService
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SetState.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SetState.java
new file mode 100644
index 0000000000..e9356266e4
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SetState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.cs.tests.system.SetStateService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Set;
+
+public class SetState extends AbstractProcessor {
+
+    static final PropertyDescriptor STATE_SERVICE = new Builder()
+        .name("State Service")
+        .identifiesControllerService(SetStateService.class)
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .build();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return List.of(STATE_SERVICE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Set.of(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final SetStateService service = 
context.getProperty(STATE_SERVICE).asControllerService(SetStateService.class);
+        try {
+            service.setState("local", "local", Scope.LOCAL);
+            service.setState("cluster", "cluster", Scope.CLUSTER);
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9b6ba1cbd5..d0a01b38b5 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -45,6 +45,7 @@ org.apache.nifi.processors.tests.system.ReverseContents
 org.apache.nifi.processors.tests.system.RoundRobinFlowFiles
 org.apache.nifi.processors.tests.system.SensitiveDynamicPropertiesProcessor
 org.apache.nifi.processors.tests.system.SetAttribute
+org.apache.nifi.processors.tests.system.SetState
 org.apache.nifi.processors.tests.system.Sleep
 org.apache.nifi.processors.tests.system.SplitByLine
 org.apache.nifi.processors.tests.system.SplitTextByLine
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceStateIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceStateIT.java
new file mode 100644
index 0000000000..ac00efcbe8
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceStateIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.controllerservice;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ControllerServiceStateIT extends NiFiSystemIT {
+
+    @Override
+    public NiFiInstanceFactory getInstanceFactory() {
+        return createTwoNodeInstanceFactory();
+    }
+
+    @Test
+    public void testLocalClusterState() throws NiFiClientException, 
IOException, InterruptedException {
+        final String tempDir = System.getProperty("java.io.tmpdir");
+
+        final File storageDir = new File(tempDir);
+        final File localStateFile1 = new File(storageDir, 
"local-state-1.properties");
+        final File clusterStateFile1 = new File(storageDir, 
"cluster-state-1.properties");
+        final File localStateFile2 = new File(storageDir, 
"local-state-2.properties");
+        final File clusterStateFile2 = new File(storageDir, 
"cluster-state-2.properties");
+        localStateFile1.delete();
+        clusterStateFile1.delete();
+        localStateFile2.delete();
+        clusterStateFile2.delete();
+
+        final ControllerServiceEntity failureService = 
getClientUtil().createControllerService("VerifyLocalClusterStateService");
+        getClientUtil().updateControllerServiceProperties(failureService, 
Map.of("File Storage Location", tempDir));
+        getClientUtil().enableControllerService(failureService);
+
+        final ProcessorEntity setState = 
getClientUtil().createProcessor("SetState");
+        getClientUtil().updateProcessorProperties(setState, Map.of("State 
Service", failureService.getId()));
+        getClientUtil().setAutoTerminatedRelationships(setState, "success");
+        getClientUtil().waitForValidProcessor(setState.getId());
+
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        final ConnectionEntity connection = 
getClientUtil().createConnection(generate, setState, "success");
+
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(connection, getNumberOfNodes());
+
+        getClientUtil().startProcessor(setState);
+        waitForQueueCount(connection, 0);
+
+        getNiFiInstance().stop();
+        getNiFiInstance().start(true);
+
+        waitFor(() -> localStateFile1.exists() && clusterStateFile1.exists());
+
+        final Properties localProperties1 = 
loadPropertiesFromFile(localStateFile1);
+        final Properties clusterProperties1 = 
loadPropertiesFromFile(clusterStateFile1);
+        final Properties localProperties2 = 
loadPropertiesFromFile(localStateFile2);
+        final Properties clusterProperties2 = 
loadPropertiesFromFile(clusterStateFile2);
+
+        final Properties expectedLocalProperties = new Properties();
+        expectedLocalProperties.setProperty("local", "local");
+        final Properties expectedClusterProperties = new Properties();
+        expectedClusterProperties.setProperty("cluster", "cluster");
+
+        assertEquals(expectedLocalProperties, localProperties1);
+        assertEquals(expectedLocalProperties, localProperties2);
+        assertEquals(expectedClusterProperties, clusterProperties1);
+        assertEquals(expectedClusterProperties, clusterProperties2);
+    }
+
+    private Properties loadPropertiesFromFile(final File file) throws 
IOException {
+        final Properties properties = new Properties();
+        try (final FileInputStream in = new FileInputStream(file)) {
+            properties.load(in);
+        }
+        return properties;
+    }
+}

Reply via email to