This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 3722d223 [FLINK-39814] Honor FlinkSessionJob savepoint dir in snapshots
3722d223 is described below

commit 3722d223bd2e778fc77430039f1ef8aa932064ca
Author: lrsb <[email protected]>
AuthorDate: Wed Jun 3 17:35:46 2026 +0200

    [FLINK-39814] Honor FlinkSessionJob savepoint dir in snapshots
---
 .../controller/FlinkStateSnapshotContext.java      |  13 ++-
 .../snapshot/StateSnapshotReconciler.java          |   4 +-
 .../controller/FlinkStateSnapshotContextTest.java  | 107 +++++++++++++++++++++
 3 files changed, 121 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java
index 64c46619..e42f7773 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java
@@ -65,7 +65,18 @@ public class FlinkStateSnapshotContext {
     }
 
     private Configuration referencedJobObserveConfig() {
-        return 
getConfigManager().getObserveConfig(getReferencedJobFlinkDeployment());
+        var flinkDeployment = getReferencedJobFlinkDeployment();
+        return getSecondaryResource()
+                .filter(FlinkSessionJob.class::isInstance)
+                .map(FlinkSessionJob.class::cast)
+                .map(
+                        sessionJob ->
+                                getConfigManager()
+                                        .getSessionJobConfig(
+                                                
sessionJob.getMetadata().getName(),
+                                                flinkDeployment,
+                                                sessionJob.getSpec()))
+                .orElseGet(() -> 
getConfigManager().getObserveConfig(flinkDeployment));
     }
 
     private FlinkDeployment referencedJobFlinkDeployment() {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java
index 8d2ffeee..b4dcdd99 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java
@@ -190,10 +190,10 @@ public class StateSnapshotReconciler {
         var flinkService = flinkDeploymentContext.getFlinkService();
         var conf =
                 Preconditions.checkNotNull(
-                        flinkDeploymentContext.getObserveConfig(),
+                        ctx.getReferencedJobObserveConfig(),
                         String.format(
                                 "Observe config was null for %s",
-                                
flinkDeploymentContext.getResource().getMetadata().getName()));
+                                ctx.getResource().getMetadata().getName()));
 
         if (spec.isSavepoint()) {
             var path =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContextTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContextTest.java
new file mode 100644
index 00000000..8a9b923e
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContextTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobReference;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link FlinkStateSnapshotContext}. */
+public class FlinkStateSnapshotContextTest {
+
+    private static final String DEPLOYMENT_SAVEPOINT_DIR = 
"test-savepoint-dir";
+    private static final String SESSION_JOB_SAVEPOINT_DIR = 
"hdfs:///savepoints/per-job/";
+
+    private final FlinkConfigManager configManager = new 
FlinkConfigManager(new Configuration());
+
+    @Test
+    public void testReferencedJobObserveConfigHonorsSessionJobSavepointDir() {
+        var deployment = deployedSessionCluster();
+        var sessionJob = TestUtils.buildSessionJob();
+        sessionJob
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), 
SESSION_JOB_SAVEPOINT_DIR);
+
+        var conf = contextFor(sessionJob, deployment, 
sessionJob).getReferencedJobObserveConfig();
+
+        assertThat(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY))
+                .isEqualTo(SESSION_JOB_SAVEPOINT_DIR);
+    }
+
+    @Test
+    public void 
testReferencedJobObserveConfigFallsBackToDeploymentSavepointDir() {
+        var deployment = deployedSessionCluster();
+
+        var conf = contextFor(deployment, deployment, 
null).getReferencedJobObserveConfig();
+
+        assertThat(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY))
+                .isEqualTo(DEPLOYMENT_SAVEPOINT_DIR);
+    }
+
+    private FlinkDeployment deployedSessionCluster() {
+        var deployment = TestUtils.buildSessionCluster();
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+        return deployment;
+    }
+
+    private FlinkStateSnapshotContext contextFor(
+            AbstractFlinkResource<?, ?> referencedJob,
+            FlinkDeployment deployment,
+            FlinkSessionJob sessionJob) {
+        var snapshot =
+                TestUtils.buildFlinkStateSnapshotSavepoint(
+                        false, JobReference.fromFlinkResource(referencedJob));
+        return new FlinkStateSnapshotContext(
+                snapshot,
+                null,
+                null,
+                secondaryResourceContext(deployment, sessionJob),
+                configManager);
+    }
+
+    private Context<FlinkStateSnapshot> secondaryResourceContext(
+            FlinkDeployment deployment, FlinkSessionJob sessionJob) {
+        return new TestUtils.TestingContext<>() {
+            @Override
+            public <T> Optional<T> getSecondaryResource(Class<T> expectedType, 
String eventName) {
+                if (FlinkSessionJob.class.equals(expectedType)) {
+                    return 
Optional.ofNullable(sessionJob).map(expectedType::cast);
+                }
+                if (FlinkDeployment.class.equals(expectedType)) {
+                    return 
Optional.ofNullable(deployment).map(expectedType::cast);
+                }
+                return Optional.empty();
+            }
+        };
+    }
+}

Reply via email to