This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2c843bb217 NIFI-15243 Improved KubernetesConfigMapStateProvider Set
State Handling (#10619)
2c843bb217 is described below
commit 2c843bb217d63c2758a5cebfcf663c187bfdc90d
Author: Bryan Bende <[email protected]>
AuthorDate: Thu Jan 8 13:49:53 2026 -0500
NIFI-15243 Improved KubernetesConfigMapStateProvider Set State Handling
(#10619)
Signed-off-by: David Handermann <[email protected]>
---
.../provider/KubernetesConfigMapStateProvider.java | 102 +++++++++------------
.../state/provider/StandardStateMap.java | 10 +-
.../KubernetesConfigMapStateProviderTest.java | 7 +-
3 files changed, 52 insertions(+), 67 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
index 35b5b7cb11..9bd5f0e00b 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
@@ -155,64 +155,28 @@ public class KubernetesConfigMapStateProvider extends
AbstractConfigurableCompon
@Override
public void setState(final Map<String, String> state, final String
componentId) throws IOException {
try {
- final ConfigMap configMap = createConfigMapBuilder(state,
componentId).build();
- Resource<ConfigMap> configMapResource =
kubernetesClient.configMaps().resource(configMap);
- final String configMapName = configMap.getMetadata().getName();
-
- ConfigMap configMapCreated = null;
+ final ConfigMap configMap = createConfigMapBuilder(state,
componentId, null).build();
+ ConfigMap configMapResult = null;
- // Attempt to create or update, up to 3 times. We expect that we
will update more frequently than create
- // so we first attempt to update. If we get back a 404, then we
create it.
- boolean create = false;
+ // Attempt to create or update, up to 3 times
for (int attempt = 0; attempt < MAX_UPDATE_ATTEMPTS; attempt++) {
try {
- if (create) {
- configMapCreated = configMapResource.create();
+ final ConfigMap existingConfigMap =
kubernetesClient.configMaps().resource(configMap).get();
+ if (existingConfigMap == null) {
+ configMapResult =
kubernetesClient.configMaps().resource(configMap).create();
} else {
- configMapCreated = configMapResource.update();
+ existingConfigMap.setData(configMap.getData());
+ configMapResult =
kubernetesClient.configMaps().resource(existingConfigMap).update();
}
-
break;
} catch (final KubernetesClientException e) {
final int returnCode = e.getCode();
- if (returnCode == HttpURLConnection.HTTP_NOT_FOUND) {
- // A 404 return code indicates that we need to create
the resource instead of update it.
- // Now, we will attempt to create the resource instead
of update it, so we'll reset the attempt counter.
- attempt = 0;
- create = true;
- continue;
- }
-
- if (returnCode == HttpURLConnection.HTTP_CONFLICT) {
- logger.debug("Update conflict detected when setting
state for Component ID [{}]. Attempt {} of {}.", componentId, attempt + 1,
MAX_UPDATE_ATTEMPTS);
-
- if (attempt < MAX_UPDATE_ATTEMPTS - 1) {
- final ConfigMap latestConfigMap =
kubernetesClient.configMaps()
- .inNamespace(namespace)
- .withName(configMapName)
- .get();
-
- if (latestConfigMap != null) {
- final ObjectMeta latestMetadata =
latestConfigMap.getMetadata();
- final String latestResourceVersion =
latestMetadata != null ? latestMetadata.getResourceVersion() : null;
-
- if (latestResourceVersion != null) {
-
configMap.getMetadata().setResourceVersion(latestResourceVersion);
- configMapResource =
kubernetesClient.configMaps().resource(configMap);
- logger.debug("Retrying state update for
Component ID [{}] with resource version [{}]", componentId,
latestResourceVersion);
- continue;
- }
- }
- }
-
- throw e;
- }
-
- if (returnCode >= 500) {
- // Server-side error. We should retry, up to some
number of attempts.
+ if (returnCode == HttpURLConnection.HTTP_CONFLICT ||
returnCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
+ // Conflict or Server-side error. We should retry, up
to some number of attempts.
if (attempt == MAX_UPDATE_ATTEMPTS - 1) {
throw e;
}
+ logger.warn("Failed to update state for Component ID
[{}] on attempt {} of {}", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS, e);
} else {
// There's an issue with the request. Throw the
Exception.
throw e;
@@ -227,11 +191,11 @@ public class KubernetesConfigMapStateProvider extends
AbstractConfigurableCompon
}
}
- if (configMapCreated == null) {
+ if (configMapResult == null) {
throw new IOException("Exhausted maximum number of attempts
(%s) to update state for component with ID %s but could not update
it".formatted(MAX_UPDATE_ATTEMPTS, componentId));
}
- final Optional<String> version = getVersion(configMapCreated);
+ final Optional<String> version = getVersion(configMapResult);
logger.debug("Set State Component ID [{}] Version [{}]",
componentId, version);
} catch (final KubernetesClientException e) {
if (isNotFound(e.getCode())) {
@@ -257,7 +221,8 @@ public class KubernetesConfigMapStateProvider extends
AbstractConfigurableCompon
final ConfigMap configMap = configMapResource(componentId).get();
final Map<String, String> data = configMap == null ?
Collections.emptyMap() : getDecodedMap(configMap.getData());
final Optional<String> version = configMap == null ?
Optional.empty() : getVersion(configMap);
- return new StandardStateMap(data, version);
+ final Optional<ObjectMeta> configMapMetadata = configMap == null ?
Optional.empty() : Optional.of(configMap.getMetadata());
+ return new StandardStateMap(data, version, configMapMetadata);
} catch (final RuntimeException e) {
throw new IOException(String.format("Get failed for Component ID
[%s]", componentId), e);
}
@@ -273,12 +238,19 @@ public class KubernetesConfigMapStateProvider extends
AbstractConfigurableCompon
*/
@Override
public boolean replace(final StateMap currentState, final Map<String,
String> state, final String componentId) throws IOException {
- final ConfigMapBuilder configMapBuilder =
createConfigMapBuilder(state, componentId);
- final Optional<String> stateVersion = currentState.getStateVersion();
- if (stateVersion.isPresent()) {
- final String resourceVersion = stateVersion.get();
-
configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata();
+ if (currentState instanceof StandardStateMap standardStateMap) {
+ return replace(standardStateMap, state, componentId);
+ } else {
+ throw new IllegalStateException("Current state is not an instance
of StandardStateMap");
}
+ }
+
+ private boolean replace(final StandardStateMap currentState, final
Map<String, String> state, final String componentId) throws IOException {
+ final Optional<ObjectMeta> existingMetadata =
currentState.getConfigMapMetadata();
+ final ConfigMapBuilder configMapBuilder =
createConfigMapBuilder(state, componentId, existingMetadata.orElse(null));
+
+ final Optional<String> stateVersion = currentState.getStateVersion();
+ stateVersion.ifPresent(resourceVersion ->
configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata());
final ConfigMap configMap = configMapBuilder.build();
try {
@@ -412,15 +384,25 @@ public class KubernetesConfigMapStateProvider extends
AbstractConfigurableCompon
return
kubernetesClient.configMaps().inNamespace(namespace).withName(name);
}
- private ConfigMapBuilder createConfigMapBuilder(final Map<String, String>
state, final String componentId) {
- final Map<String, String> encodedData = getEncodedMap(state);
+ private ConfigMapBuilder createConfigMapBuilder(final Map<String, String>
state, final String componentId, final ObjectMeta existingMetadata) {
final String name = getConfigMapName(componentId);
- return new ConfigMapBuilder()
+
+ final ConfigMapBuilder configMapBuilder;
+ if (existingMetadata == null) {
+ configMapBuilder = new ConfigMapBuilder()
.withNewMetadata()
.withNamespace(namespace)
.withName(name)
- .endMetadata()
- .withData(encodedData);
+ .endMetadata();
+ } else if (namespace.equals(existingMetadata.getNamespace()) &&
name.equals(existingMetadata.getName())) {
+ configMapBuilder = new
ConfigMapBuilder().withMetadata(existingMetadata);
+ } else {
+ throw new IllegalArgumentException("ConfigMap metadata with
namespace [%s] and name [%s], did not match expected namespace [%s] and name
[%s]"
+ .formatted(existingMetadata.getNamespace(),
existingMetadata.getName(), namespace, name));
+ }
+
+ final Map<String, String> encodedData = getEncodedMap(state);
+ return configMapBuilder.withData(encodedData);
}
private String getConfigMapName(final String componentId) {
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
index e81fc81975..440edc1dab 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.kubernetes.state.provider;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import org.apache.nifi.components.state.StateMap;
import java.util.Collections;
@@ -30,9 +31,12 @@ class StandardStateMap implements StateMap {
private final Optional<String> version;
- StandardStateMap(final Map<String, String> data, final Optional<String>
version) {
+ private final Optional<ObjectMeta> configMapMetadata;
+
+ StandardStateMap(final Map<String, String> data, final Optional<String>
version, final Optional<ObjectMeta> configMapMetadata) {
this.data = Collections.unmodifiableMap(data == null ?
Collections.emptyMap() : data);
this.version = version;
+ this.configMapMetadata = configMapMetadata;
}
@@ -66,4 +70,8 @@ class StandardStateMap implements StateMap {
public Map<String, String> toMap() {
return data;
}
+
+ public Optional<ObjectMeta> getConfigMapMetadata() {
+ return configMapMetadata;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
index 82e9a78b63..7ef2117682 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
@@ -23,7 +23,6 @@ import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
-import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -61,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -182,13 +180,10 @@ class KubernetesConfigMapStateProviderTest {
void testSetStateConflict() {
final KubernetesClient mockClient = mock(KubernetesClient.class);
final MixedOperation<ConfigMap, ConfigMapList, Resource<ConfigMap>>
mockConfigMaps = mock(MixedOperation.class);
- final NonNamespaceOperation<ConfigMap, ConfigMapList,
Resource<ConfigMap>> mockNamespacedConfigMaps =
mock(NonNamespaceOperation.class);
final Resource<ConfigMap> mockResource = mock(Resource.class);
when(mockClient.configMaps()).thenReturn(mockConfigMaps);
when(mockConfigMaps.resource(any(ConfigMap.class))).thenReturn(mockResource);
-
when(mockConfigMaps.inNamespace(DEFAULT_NAMESPACE)).thenReturn(mockNamespacedConfigMaps);
-
when(mockNamespacedConfigMaps.withName(anyString())).thenReturn(mockResource);
final String conflictMessageTemplate = "Operation cannot be fulfilled
on configmaps \"nifi-component-%s\": "
+ "the object has been modified; please apply your changes to
the latest version and try again";
@@ -274,7 +269,7 @@ class KubernetesConfigMapStateProviderTest {
setContext();
provider.initialize(context);
- final StateMap stateMap = new StandardStateMap(Collections.emptyMap(),
Optional.empty());
+ final StateMap stateMap = new StandardStateMap(Collections.emptyMap(),
Optional.empty(), Optional.empty());
final boolean replaced = provider.replace(stateMap,
Collections.emptyMap(), COMPONENT_ID);
assertTrue(replaced);