This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c843bb217 NIFI-15243 Improved KubernetesConfigMapStateProvider Set 
State Handling (#10619)
2c843bb217 is described below

commit 2c843bb217d63c2758a5cebfcf663c187bfdc90d
Author: Bryan Bende <[email protected]>
AuthorDate: Thu Jan 8 13:49:53 2026 -0500

    NIFI-15243 Improved KubernetesConfigMapStateProvider Set State Handling 
(#10619)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../provider/KubernetesConfigMapStateProvider.java | 102 +++++++++------------
 .../state/provider/StandardStateMap.java           |  10 +-
 .../KubernetesConfigMapStateProviderTest.java      |   7 +-
 3 files changed, 52 insertions(+), 67 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
index 35b5b7cb11..9bd5f0e00b 100644
--- 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
+++ 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
@@ -155,64 +155,28 @@ public class KubernetesConfigMapStateProvider extends 
AbstractConfigurableCompon
     @Override
     public void setState(final Map<String, String> state, final String 
componentId) throws IOException {
         try {
-            final ConfigMap configMap = createConfigMapBuilder(state, 
componentId).build();
-            Resource<ConfigMap> configMapResource = 
kubernetesClient.configMaps().resource(configMap);
-            final String configMapName = configMap.getMetadata().getName();
-
-            ConfigMap configMapCreated = null;
+            final ConfigMap configMap = createConfigMapBuilder(state, 
componentId, null).build();
+            ConfigMap configMapResult = null;
 
-            // Attempt to create or update, up to 3 times. We expect that we 
will update more frequently than create
-            // so we first attempt to update. If we get back a 404, then we 
create it.
-            boolean create = false;
+            // Attempt to create or update, up to 3 times
             for (int attempt = 0; attempt < MAX_UPDATE_ATTEMPTS; attempt++) {
                 try {
-                    if (create) {
-                        configMapCreated = configMapResource.create();
+                    final ConfigMap existingConfigMap = 
kubernetesClient.configMaps().resource(configMap).get();
+                    if (existingConfigMap == null) {
+                        configMapResult = 
kubernetesClient.configMaps().resource(configMap).create();
                     } else {
-                        configMapCreated = configMapResource.update();
+                        existingConfigMap.setData(configMap.getData());
+                        configMapResult = 
kubernetesClient.configMaps().resource(existingConfigMap).update();
                     }
-
                     break;
                 } catch (final KubernetesClientException e) {
                     final int returnCode = e.getCode();
-                    if (returnCode == HttpURLConnection.HTTP_NOT_FOUND) {
-                        // A 404 return code indicates that we need to create 
the resource instead of update it.
-                        // Now, we will attempt to create the resource instead 
of update it, so we'll reset the attempt counter.
-                        attempt = 0;
-                        create = true;
-                        continue;
-                    }
-
-                    if (returnCode == HttpURLConnection.HTTP_CONFLICT) {
-                        logger.debug("Update conflict detected when setting 
state for Component ID [{}]. Attempt {} of {}.", componentId, attempt + 1, 
MAX_UPDATE_ATTEMPTS);
-
-                        if (attempt < MAX_UPDATE_ATTEMPTS - 1) {
-                            final ConfigMap latestConfigMap = 
kubernetesClient.configMaps()
-                                    .inNamespace(namespace)
-                                    .withName(configMapName)
-                                    .get();
-
-                            if (latestConfigMap != null) {
-                                final ObjectMeta latestMetadata = 
latestConfigMap.getMetadata();
-                                final String latestResourceVersion = 
latestMetadata != null ? latestMetadata.getResourceVersion() : null;
-
-                                if (latestResourceVersion != null) {
-                                    
configMap.getMetadata().setResourceVersion(latestResourceVersion);
-                                    configMapResource = 
kubernetesClient.configMaps().resource(configMap);
-                                    logger.debug("Retrying state update for 
Component ID [{}] with resource version [{}]", componentId, 
latestResourceVersion);
-                                    continue;
-                                }
-                            }
-                        }
-
-                        throw e;
-                    }
-
-                    if (returnCode >= 500) {
-                        // Server-side error. We should retry, up to some 
number of attempts.
+                    if (returnCode == HttpURLConnection.HTTP_CONFLICT || 
returnCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
+                        // Conflict or Server-side error. We should retry, up 
to some number of attempts.
                         if (attempt == MAX_UPDATE_ATTEMPTS - 1) {
                             throw e;
                         }
+                        logger.warn("Failed to update state for Component ID 
[{}] on attempt {} of {}", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS, e);
                     } else {
                         // There's an issue with the request. Throw the 
Exception.
                         throw e;
@@ -227,11 +191,11 @@ public class KubernetesConfigMapStateProvider extends 
AbstractConfigurableCompon
                 }
             }
 
-            if (configMapCreated == null) {
+            if (configMapResult == null) {
                 throw new IOException("Exhausted maximum number of attempts 
(%s) to update state for component with ID %s but could not update 
it".formatted(MAX_UPDATE_ATTEMPTS, componentId));
             }
 
-            final Optional<String> version = getVersion(configMapCreated);
+            final Optional<String> version = getVersion(configMapResult);
             logger.debug("Set State Component ID [{}] Version [{}]", 
componentId, version);
         } catch (final KubernetesClientException e) {
             if (isNotFound(e.getCode())) {
@@ -257,7 +221,8 @@ public class KubernetesConfigMapStateProvider extends 
AbstractConfigurableCompon
             final ConfigMap configMap = configMapResource(componentId).get();
             final Map<String, String> data = configMap == null ? 
Collections.emptyMap() : getDecodedMap(configMap.getData());
             final Optional<String> version = configMap == null ? 
Optional.empty() : getVersion(configMap);
-            return new StandardStateMap(data, version);
+            final Optional<ObjectMeta> configMapMetadata = configMap == null ? 
Optional.empty() : Optional.of(configMap.getMetadata());
+            return new StandardStateMap(data, version, configMapMetadata);
         } catch (final RuntimeException e) {
             throw new IOException(String.format("Get failed for Component ID 
[%s]", componentId), e);
         }
@@ -273,12 +238,19 @@ public class KubernetesConfigMapStateProvider extends 
AbstractConfigurableCompon
      */
     @Override
     public boolean replace(final StateMap currentState, final Map<String, 
String> state, final String componentId) throws IOException {
-        final ConfigMapBuilder configMapBuilder = 
createConfigMapBuilder(state, componentId);
-        final Optional<String> stateVersion = currentState.getStateVersion();
-        if (stateVersion.isPresent()) {
-            final String resourceVersion = stateVersion.get();
-            
configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata();
+        if (currentState instanceof StandardStateMap standardStateMap) {
+            return replace(standardStateMap, state, componentId);
+        } else {
+            throw new IllegalStateException("Current state is not an instance 
of StandardStateMap");
         }
+    }
+
+    private boolean replace(final StandardStateMap currentState, final 
Map<String, String> state, final String componentId) throws IOException {
+        final Optional<ObjectMeta> existingMetadata = 
currentState.getConfigMapMetadata();
+        final ConfigMapBuilder configMapBuilder = 
createConfigMapBuilder(state, componentId, existingMetadata.orElse(null));
+
+        final Optional<String> stateVersion = currentState.getStateVersion();
+        stateVersion.ifPresent(resourceVersion -> 
configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata());
         final ConfigMap configMap = configMapBuilder.build();
 
         try {
@@ -412,15 +384,25 @@ public class KubernetesConfigMapStateProvider extends 
AbstractConfigurableCompon
         return 
kubernetesClient.configMaps().inNamespace(namespace).withName(name);
     }
 
-    private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> 
state, final String componentId) {
-        final Map<String, String> encodedData = getEncodedMap(state);
+    private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> 
state, final String componentId, final ObjectMeta existingMetadata) {
         final String name = getConfigMapName(componentId);
-        return new ConfigMapBuilder()
+
+        final ConfigMapBuilder configMapBuilder;
+        if (existingMetadata == null) {
+            configMapBuilder = new ConfigMapBuilder()
                 .withNewMetadata()
                 .withNamespace(namespace)
                 .withName(name)
-                .endMetadata()
-                .withData(encodedData);
+                .endMetadata();
+        } else if (namespace.equals(existingMetadata.getNamespace()) && 
name.equals(existingMetadata.getName())) {
+            configMapBuilder = new 
ConfigMapBuilder().withMetadata(existingMetadata);
+        } else {
+            throw new IllegalArgumentException("ConfigMap metadata with 
namespace [%s] and name [%s], did not match expected namespace [%s] and name 
[%s]"
+                .formatted(existingMetadata.getNamespace(), 
existingMetadata.getName(), namespace, name));
+        }
+
+        final Map<String, String> encodedData = getEncodedMap(state);
+        return configMapBuilder.withData(encodedData);
     }
 
     private String getConfigMapName(final String componentId) {
diff --git 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
index e81fc81975..440edc1dab 100644
--- 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
+++ 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.kubernetes.state.provider;
 
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import org.apache.nifi.components.state.StateMap;
 
 import java.util.Collections;
@@ -30,9 +31,12 @@ class StandardStateMap implements StateMap {
 
     private final Optional<String> version;
 
-    StandardStateMap(final Map<String, String> data, final Optional<String> 
version) {
+    private final Optional<ObjectMeta> configMapMetadata;
+
+    StandardStateMap(final Map<String, String> data, final Optional<String> 
version, final Optional<ObjectMeta> configMapMetadata) {
         this.data = Collections.unmodifiableMap(data == null ? 
Collections.emptyMap() : data);
         this.version = version;
+        this.configMapMetadata = configMapMetadata;
     }
 
 
@@ -66,4 +70,8 @@ class StandardStateMap implements StateMap {
     public Map<String, String> toMap() {
         return data;
     }
+
+    public Optional<ObjectMeta> getConfigMapMetadata() {
+        return configMapMetadata;
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
index 82e9a78b63..7ef2117682 100644
--- 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
+++ 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
@@ -23,7 +23,6 @@ import io.fabric8.kubernetes.api.model.StatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.dsl.MixedOperation;
-import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
 import io.fabric8.kubernetes.client.dsl.Resource;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -61,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -182,13 +180,10 @@ class KubernetesConfigMapStateProviderTest {
     void testSetStateConflict() {
         final KubernetesClient mockClient = mock(KubernetesClient.class);
         final MixedOperation<ConfigMap, ConfigMapList, Resource<ConfigMap>> 
mockConfigMaps = mock(MixedOperation.class);
-        final NonNamespaceOperation<ConfigMap, ConfigMapList, 
Resource<ConfigMap>> mockNamespacedConfigMaps = 
mock(NonNamespaceOperation.class);
         final Resource<ConfigMap> mockResource = mock(Resource.class);
 
         when(mockClient.configMaps()).thenReturn(mockConfigMaps);
         
when(mockConfigMaps.resource(any(ConfigMap.class))).thenReturn(mockResource);
-        
when(mockConfigMaps.inNamespace(DEFAULT_NAMESPACE)).thenReturn(mockNamespacedConfigMaps);
-        
when(mockNamespacedConfigMaps.withName(anyString())).thenReturn(mockResource);
 
         final String conflictMessageTemplate = "Operation cannot be fulfilled 
on configmaps \"nifi-component-%s\": "
                 + "the object has been modified; please apply your changes to 
the latest version and try again";
@@ -274,7 +269,7 @@ class KubernetesConfigMapStateProviderTest {
         setContext();
         provider.initialize(context);
 
-        final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), 
Optional.empty());
+        final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), 
Optional.empty(), Optional.empty());
         final boolean replaced = provider.replace(stateMap, 
Collections.emptyMap(), COMPONENT_ID);
 
         assertTrue(replaced);

Reply via email to