This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 1fba88e9e6bc32ef4155a20a4713d5623a709356
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Nov 10 18:02:43 2022 +0100

    [FLINK-29959] Use optimistic locking when updating the status to avoid 
potential race conditions
---
 .../operator/api/utils/BaseTestUtils.java          |  2 +
 .../exception/StatusConflictException.java         | 28 +++++++
 .../kubernetes/operator/utils/StatusRecorder.java  | 98 +++++++++++++++++-----
 .../kubernetes/operator/FlinkOperatorITCase.java   | 30 +++++--
 .../kubernetes/operator/TestingStatusRecorder.java |  3 +-
 .../metrics/KubernetesClientMetricsTest.java       |  2 +-
 6 files changed, 133 insertions(+), 30 deletions(-)

diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index a8ccdbe7..7dde1a8e 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -76,6 +76,7 @@ public class BaseTestUtils {
                         .withNamespace(namespace)
                         .withCreationTimestamp(Instant.now().toString())
                         .withUid(UUID.randomUUID().toString())
+                        .withResourceVersion("1")
                         .build());
         deployment.setSpec(getTestFlinkDeploymentSpec(version));
         return deployment;
@@ -119,6 +120,7 @@ public class BaseTestUtils {
                         .withCreationTimestamp(Instant.now().toString())
                         .withUid(UUID.randomUUID().toString())
                         .withGeneration(1L)
+                        .withResourceVersion("1")
                         .build());
 
         Map<String, String> conf = new HashMap<>();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java
new file mode 100644
index 00000000..614b45af
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+/** Exception for status updates. */
+public class StatusConflictException extends RuntimeException {
+
+    private static final long serialVersionUID = 2260638990044248181L;
+
+    public StatusConflictException(String msg) {
+        super(msg);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 8ff79e22..b1bec2f7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
@@ -26,13 +25,16 @@ import 
org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.exception.StatusConflictException;
 import org.apache.flink.kubernetes.operator.listener.AuditUtils;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +52,7 @@ public class StatusRecorder<
 
     protected final ObjectMapper objectMapper = new ObjectMapper();
 
-    protected final ConcurrentHashMap<Tuple2<String, String>, ObjectNode> 
statusCache =
+    protected final ConcurrentHashMap<ResourceID, ObjectNode> statusCache =
             new ConcurrentHashMap<>();
 
     private final KubernetesClient client;
@@ -76,13 +78,10 @@ public class StatusRecorder<
      */
     @SneakyThrows
     public void patchAndCacheStatus(CR resource) {
-        // This is necessary so the client wouldn't fail of the underlying 
resource spec was updated
-        // in the meantime
-        resource.getMetadata().setResourceVersion(null);
-
         ObjectNode newStatusNode =
                 objectMapper.convertValue(resource.getStatus(), 
ObjectNode.class);
-        ObjectNode previousStatusNode = statusCache.put(getKey(resource), 
newStatusNode);
+        var resourceId = ResourceID.fromResource(resource);
+        ObjectNode previousStatusNode = statusCache.get(resourceId);
 
         if (newStatusNode.equals(previousStatusNode)) {
             LOG.debug("No status change.");
@@ -97,20 +96,79 @@ public class StatusRecorder<
 
         Exception err = null;
         for (int i = 0; i < 3; i++) {
-            // In any case we retry the status update 3 times to avoid some 
intermittent
-            // connectivity errors if any
+            // We retry the status update 3 times to avoid some intermittent 
connectivity errors
             try {
-                client.resource(resource).patchStatus();
-                statusUpdateListener.accept(resource, prevStatus);
-                metricManager.onUpdate(resource);
-                return;
-            } catch (Exception e) {
+                replaceStatus(resource, prevStatus);
+                err = null;
+            } catch (KubernetesClientException e) {
                 LOG.error("Error while patching status, retrying {}/3...", (i 
+ 1), e);
                 Thread.sleep(1000);
                 err = e;
             }
         }
-        throw err;
+
+        if (err != null) {
+            throw err;
+        }
+
+        statusCache.put(resourceId, newStatusNode);
+        statusUpdateListener.accept(resource, prevStatus);
+        metricManager.onUpdate(resource);
+    }
+
+    private void replaceStatus(CR resource, STATUS prevStatus) throws 
JsonProcessingException {
+        int retries = 0;
+        while (true) {
+            try {
+                var updated = 
client.resource(resource).lockResourceVersion().replaceStatus();
+
+                // If we successfully replaced the status, update the resource 
version so we know
+                // what to lock next in the same reconciliation loop
+                resource.getMetadata()
+                        
.setResourceVersion(updated.getMetadata().getResourceVersion());
+                return;
+            } catch (KubernetesClientException kce) {
+                // 409 is the error code for conflicts resulting from the 
locking
+                if (kce.getCode() == 409) {
+                    var currentVersion = 
resource.getMetadata().getResourceVersion();
+                    LOG.debug(
+                            "Could not apply status update for resource 
version {}",
+                            currentVersion);
+
+                    var latest = client.resource(resource).fromServer().get();
+                    var latestVersion = 
latest.getMetadata().getResourceVersion();
+
+                    if (latestVersion.equals(currentVersion)) {
+                        // This should not happen as long as the client works 
consistently
+                        LOG.error("Unable to fetch latest resource version");
+                        throw kce;
+                    }
+
+                    if (latest.getStatus().equals(prevStatus)) {
+                        if (retries++ < 3) {
+                            LOG.debug(
+                                    "Retrying status update for latest version 
{}", latestVersion);
+                            
resource.getMetadata().setResourceVersion(latestVersion);
+                        } else {
+                            // If we cannot get the latest version in 3 tries 
we throw the error to
+                            // retry with delay
+                            throw kce;
+                        }
+                    } else {
+                        throw new StatusConflictException(
+                                "Status have been modified externally in 
version "
+                                        + latestVersion
+                                        + " Previous: "
+                                        + 
objectMapper.writeValueAsString(prevStatus)
+                                        + " Latest: "
+                                        + 
objectMapper.writeValueAsString(latest.getStatus()));
+                    }
+                } else {
+                    // We simply throw non conflict errors, to trigger retry 
with delay
+                    throw kce;
+                }
+            }
+        }
     }
 
     /**
@@ -124,7 +182,7 @@ public class StatusRecorder<
      * @param resource Resource for which the status should be updated from 
the cache
      */
     public void updateStatusFromCache(CR resource) {
-        var key = getKey(resource);
+        var key = ResourceID.fromResource(resource);
         var cachedStatus = statusCache.get(key);
         if (cachedStatus != null) {
             resource.setStatus(
@@ -147,14 +205,10 @@ public class StatusRecorder<
      * @param resource Flink resource.
      */
     public void removeCachedStatus(CR resource) {
-        statusCache.remove(getKey(resource));
+        statusCache.remove(ResourceID.fromResource(resource));
         metricManager.onRemove(resource);
     }
 
-    protected static Tuple2<String, String> getKey(HasMetadata resource) {
-        return Tuple2.of(resource.getMetadata().getNamespace(), 
resource.getMetadata().getName());
-    }
-
     public static <S extends CommonStatus<?>, CR extends 
AbstractFlinkResource<?, S>>
             StatusRecorder<CR, S> create(
                     KubernetesClient kubernetesClient,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
index 8aa50359..1a0b0ac1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
@@ -23,6 +23,10 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.api.spec.Resource;
 import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.StatusConflictException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.api.model.Namespace;
 import io.fabric8.kubernetes.api.model.NamespaceBuilder;
@@ -43,6 +47,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Flink Operator integration test. */
 public class FlinkOperatorITCase {
@@ -50,10 +55,11 @@ public class FlinkOperatorITCase {
     private static final String TEST_NAMESPACE = "flink-operator-test";
     private static final String SERVICE_ACCOUNT = "flink-operator";
     private static final String CLUSTER_ROLE_BINDING = 
"flink-operator-role-binding";
-    private static final String FLINK_VERSION = "1.15.1";
+    private static final String FLINK_VERSION = "1.15";
     private static final String IMAGE = String.format("flink:%s", 
FLINK_VERSION);
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkOperatorITCase.class);
-    private KubernetesClient client;
+    public static final String SESSION_NAME = "test-session-cluster";
+    private static KubernetesClient client;
 
     @BeforeEach
     public void setup() {
@@ -83,7 +89,7 @@ public class FlinkOperatorITCase {
     public void test() {
         FlinkDeployment flinkDeployment = buildSessionCluster();
         LOG.info("Deploying {}", flinkDeployment.getMetadata().getName());
-        client.resource(flinkDeployment).createOrReplace();
+        var v1 = client.resource(flinkDeployment).createOrReplace();
 
         await().atMost(1, MINUTES)
                 .untilAsserted(
@@ -92,16 +98,28 @@ public class FlinkOperatorITCase {
                                         client.apps()
                                                 .deployments()
                                                 .inNamespace(TEST_NAMESPACE)
-                                                
.withName(flinkDeployment.getMetadata().getName())
+                                                .withName(SESSION_NAME)
                                                 .isReady(),
                                         is(true)));
+
+        // Test status recorder locking logic
+        var statusRecorder =
+                new StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>(
+                        client, new MetricManager<>(), (a, b) -> {});
+        try {
+            v1.getStatus().setError("e2");
+            // Should throw error as status was modified externally
+            statusRecorder.patchAndCacheStatus(v1);
+            fail();
+        } catch (StatusConflictException expected) {
+        }
     }
 
     private static FlinkDeployment buildSessionCluster() {
         FlinkDeployment deployment = new FlinkDeployment();
         deployment.setMetadata(
                 new ObjectMetaBuilder()
-                        .withName("test-session-cluster")
+                        .withName(SESSION_NAME)
                         .withNamespace(TEST_NAMESPACE)
                         .build());
         FlinkDeploymentSpec spec = new FlinkDeploymentSpec();
@@ -122,7 +140,7 @@ public class FlinkOperatorITCase {
         return deployment;
     }
 
-    private void rbacSetup() {
+    private static void rbacSetup() {
         LOG.info("Creating service account {}", SERVICE_ACCOUNT);
         ServiceAccount serviceAccount =
                 new ServiceAccountBuilder()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index ebe8fb39..7cd010e3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
 
 /** Testing statusRecorder. */
 public class TestingStatusRecorder<
@@ -37,7 +38,7 @@ public class TestingStatusRecorder<
     @Override
     public void patchAndCacheStatus(CR resource) {
         statusCache.put(
-                getKey(resource),
+                ResourceID.fromResource(resource),
                 objectMapper.convertValue(resource.getStatus(), 
ObjectNode.class));
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
index 8bacff26..64f27da6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
@@ -92,7 +92,7 @@ public class KubernetesClientMetricsTest {
                         mockServer.createClient().getConfiguration());
 
         var deployment = TestUtils.buildApplicationCluster();
-        kubernetesClient.resource(deployment).get();
+        kubernetesClient.resource(deployment).fromServer().get();
         
assertFalse(listener.getCounter(listener.getMetricId(REQUEST_COUNTER_ID)).isPresent());
         
assertFalse(listener.getMeter(listener.getMetricId(REQUEST_METER_ID)).isPresent());
         assertFalse(

Reply via email to