This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 1fba88e9e6bc32ef4155a20a4713d5623a709356 Author: Gyula Fora <[email protected]> AuthorDate: Thu Nov 10 18:02:43 2022 +0100 [FLINK-29959] Use optimistic locking when updating the status to avoid potential race conditions --- .../operator/api/utils/BaseTestUtils.java | 2 + .../exception/StatusConflictException.java | 28 +++++++ .../kubernetes/operator/utils/StatusRecorder.java | 98 +++++++++++++++++----- .../kubernetes/operator/FlinkOperatorITCase.java | 30 +++++-- .../kubernetes/operator/TestingStatusRecorder.java | 3 +- .../metrics/KubernetesClientMetricsTest.java | 2 +- 6 files changed, 133 insertions(+), 30 deletions(-) diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java index a8ccdbe7..7dde1a8e 100644 --- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java +++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java @@ -76,6 +76,7 @@ public class BaseTestUtils { .withNamespace(namespace) .withCreationTimestamp(Instant.now().toString()) .withUid(UUID.randomUUID().toString()) + .withResourceVersion("1") .build()); deployment.setSpec(getTestFlinkDeploymentSpec(version)); return deployment; @@ -119,6 +120,7 @@ public class BaseTestUtils { .withCreationTimestamp(Instant.now().toString()) .withUid(UUID.randomUUID().toString()) .withGeneration(1L) + .withResourceVersion("1") .build()); Map<String, String> conf = new HashMap<>(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java new file mode 100644 index 00000000..614b45af --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.exception; + +/** Exception for status updates. */ +public class StatusConflictException extends RuntimeException { + + private static final long serialVersionUID = 2260638990044248181L; + + public StatusConflictException(String msg) { + super(msg); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java index 8ff79e22..b1bec2f7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.utils; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; @@ -26,13 +25,16 @@ import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; +import org.apache.flink.kubernetes.operator.exception.StatusConflictException; import org.apache.flink.kubernetes.operator.listener.AuditUtils; import org.apache.flink.kubernetes.operator.metrics.MetricManager; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.javaoperatorsdk.operator.processing.event.ResourceID; import lombok.SneakyThrows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +52,7 @@ public class StatusRecorder< protected final ObjectMapper objectMapper = new ObjectMapper(); - protected final ConcurrentHashMap<Tuple2<String, String>, ObjectNode> statusCache = + protected final ConcurrentHashMap<ResourceID, ObjectNode> statusCache = new ConcurrentHashMap<>(); private final KubernetesClient client; @@ -76,13 +78,10 @@ public class StatusRecorder< */ @SneakyThrows public void patchAndCacheStatus(CR resource) { - // This is necessary so the client wouldn't fail of the underlying resource spec was updated - // in the meantime - resource.getMetadata().setResourceVersion(null); - ObjectNode newStatusNode = objectMapper.convertValue(resource.getStatus(), ObjectNode.class); - ObjectNode previousStatusNode = statusCache.put(getKey(resource), newStatusNode); + var resourceId = ResourceID.fromResource(resource); + ObjectNode previousStatusNode = statusCache.get(resourceId); if (newStatusNode.equals(previousStatusNode)) { LOG.debug("No status change."); @@ -97,20 +96,79 @@ public class StatusRecorder< Exception err = null; for (int i = 0; i < 3; i++) { - // In any case we retry the status update 3 times to avoid some intermittent - // connectivity errors if any + // We retry the status update 3 times to avoid some intermittent connectivity errors try { - client.resource(resource).patchStatus(); - statusUpdateListener.accept(resource, prevStatus); - metricManager.onUpdate(resource); - return; - } catch (Exception e) { + replaceStatus(resource, prevStatus); + err = null; + } catch (KubernetesClientException e) { LOG.error("Error while patching status, retrying {}/3...", (i + 1), e); Thread.sleep(1000); err = e; } } - throw err; + + if (err != null) { + throw err; + } + + statusCache.put(resourceId, newStatusNode); + statusUpdateListener.accept(resource, prevStatus); + metricManager.onUpdate(resource); + } + + private void replaceStatus(CR resource, STATUS prevStatus) throws JsonProcessingException { + int retries = 0; + while (true) { + try { + var updated = client.resource(resource).lockResourceVersion().replaceStatus(); + + // If we successfully replaced the status, update the resource version so we know + // what to lock next in the same reconciliation loop + resource.getMetadata() + .setResourceVersion(updated.getMetadata().getResourceVersion()); + return; + } catch (KubernetesClientException kce) { + // 409 is the error code for conflicts resulting from the locking + if (kce.getCode() == 409) { + var currentVersion = resource.getMetadata().getResourceVersion(); + LOG.debug( + "Could not apply status update for resource version {}", + currentVersion); + + var latest = client.resource(resource).fromServer().get(); + var latestVersion = latest.getMetadata().getResourceVersion(); + + if (latestVersion.equals(currentVersion)) { + // This should not happen as long as the client works consistently + LOG.error("Unable to fetch latest resource version"); + throw kce; + } + + if (latest.getStatus().equals(prevStatus)) { + if (retries++ < 3) { + LOG.debug( + "Retrying status update for latest version {}", latestVersion); + resource.getMetadata().setResourceVersion(latestVersion); + } else { + // If we cannot get the latest version in 3 tries we throw the error to + // retry with delay + throw kce; + } + } else { + throw new StatusConflictException( + "Status have been modified externally in version " + + latestVersion + + " Previous: " + + objectMapper.writeValueAsString(prevStatus) + + " Latest: " + + objectMapper.writeValueAsString(latest.getStatus())); + } + } else { + // We simply throw non conflict errors, to trigger retry with delay + throw kce; + } + } + } } /** @@ -124,7 +182,7 @@ public class StatusRecorder< * @param resource Resource for which the status should be updated from the cache */ public void updateStatusFromCache(CR resource) { - var key = getKey(resource); + var key = ResourceID.fromResource(resource); var cachedStatus = statusCache.get(key); if (cachedStatus != null) { resource.setStatus( @@ -147,14 +205,10 @@ public class StatusRecorder< * @param resource Flink resource. */ public void removeCachedStatus(CR resource) { - statusCache.remove(getKey(resource)); + statusCache.remove(ResourceID.fromResource(resource)); metricManager.onRemove(resource); } - protected static Tuple2<String, String> getKey(HasMetadata resource) { - return Tuple2.of(resource.getMetadata().getNamespace(), resource.getMetadata().getName()); - } - public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>> StatusRecorder<CR, S> create( KubernetesClient kubernetesClient, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java index 8aa50359..1a0b0ac1 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java @@ -23,6 +23,10 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec; import org.apache.flink.kubernetes.operator.api.spec.Resource; import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.exception.StatusConflictException; +import org.apache.flink.kubernetes.operator.metrics.MetricManager; +import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import io.fabric8.kubernetes.api.model.Namespace; import io.fabric8.kubernetes.api.model.NamespaceBuilder; @@ -43,6 +47,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.fail; /** Flink Operator integration test. */ public class FlinkOperatorITCase { @@ -50,10 +55,11 @@ public class FlinkOperatorITCase { private static final String TEST_NAMESPACE = "flink-operator-test"; private static final String SERVICE_ACCOUNT = "flink-operator"; private static final String CLUSTER_ROLE_BINDING = "flink-operator-role-binding"; - private static final String FLINK_VERSION = "1.15.1"; + private static final String FLINK_VERSION = "1.15"; private static final String IMAGE = String.format("flink:%s", FLINK_VERSION); private static final Logger LOG = LoggerFactory.getLogger(FlinkOperatorITCase.class); - private KubernetesClient client; + public static final String SESSION_NAME = "test-session-cluster"; + private static KubernetesClient client; @BeforeEach public void setup() { @@ -83,7 +89,7 @@ public class FlinkOperatorITCase { public void test() { FlinkDeployment flinkDeployment = buildSessionCluster(); LOG.info("Deploying {}", flinkDeployment.getMetadata().getName()); - client.resource(flinkDeployment).createOrReplace(); + var v1 = client.resource(flinkDeployment).createOrReplace(); await().atMost(1, MINUTES) .untilAsserted( @@ -92,16 +98,28 @@ public class FlinkOperatorITCase { client.apps() .deployments() .inNamespace(TEST_NAMESPACE) - .withName(flinkDeployment.getMetadata().getName()) + .withName(SESSION_NAME) .isReady(), is(true))); + + // Test status recorder locking logic + var statusRecorder = + new StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>( + client, new MetricManager<>(), (a, b) -> {}); + try { + v1.getStatus().setError("e2"); + // Should throw error as status was modified externally + statusRecorder.patchAndCacheStatus(v1); + fail(); + } catch (StatusConflictException expected) { + } } private static FlinkDeployment buildSessionCluster() { FlinkDeployment deployment = new FlinkDeployment(); deployment.setMetadata( new ObjectMetaBuilder() - .withName("test-session-cluster") + .withName(SESSION_NAME) .withNamespace(TEST_NAMESPACE) .build()); FlinkDeploymentSpec spec = new FlinkDeploymentSpec(); @@ -122,7 +140,7 @@ public class FlinkOperatorITCase { return deployment; } - private void rbacSetup() { + private static void rbacSetup() { LOG.info("Creating service account {}", SERVICE_ACCOUNT); ServiceAccount serviceAccount = new ServiceAccountBuilder() diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java index ebe8fb39..7cd010e3 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.metrics.MetricManager; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.javaoperatorsdk.operator.processing.event.ResourceID; /** Testing statusRecorder. */ public class TestingStatusRecorder< @@ -37,7 +38,7 @@ public class TestingStatusRecorder< @Override public void patchAndCacheStatus(CR resource) { statusCache.put( - getKey(resource), + ResourceID.fromResource(resource), objectMapper.convertValue(resource.getStatus(), ObjectNode.class)); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java index 8bacff26..64f27da6 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java @@ -92,7 +92,7 @@ public class KubernetesClientMetricsTest { mockServer.createClient().getConfiguration()); var deployment = TestUtils.buildApplicationCluster(); - kubernetesClient.resource(deployment).get(); + kubernetesClient.resource(deployment).fromServer().get(); assertFalse(listener.getCounter(listener.getMetricId(REQUEST_COUNTER_ID)).isPresent()); assertFalse(listener.getMeter(listener.getMetricId(REQUEST_METER_ID)).isPresent()); assertFalse(
