This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 48a2f68 [FLINK-26181] Support manual savepoint triggering in the
operator
48a2f68 is described below
commit 48a2f68fef73e5f7d26cf180a8124d295c39b32b
Author: Matyas Orhidi <[email protected]>
AuthorDate: Tue Mar 1 18:18:05 2022 +0100
[FLINK-26181] Support manual savepoint triggering in the operator
---
README.md | 12 +++
examples/basic-checkpoint-ha.yaml | 1 +
.../flink/kubernetes/operator/FlinkOperator.java | 2 +-
.../config/FlinkOperatorConfiguration.java | 29 ++++----
.../operator/config/OperatorConfigOptions.java | 8 ++
.../controller/FlinkDeploymentController.java | 22 +-----
.../kubernetes/operator/crd/spec/JobSpec.java | 1 +
.../kubernetes/operator/crd/status/JobStatus.java | 2 +-
.../crd/status/{JobStatus.java => Savepoint.java} | 19 +++--
.../status/{JobStatus.java => SavepointInfo.java} | 12 ++-
.../kubernetes/operator/observer/Observer.java | 48 +++++++++++-
.../operator/observer/SavepointFetchResult.java | 46 ++++++++++++
.../operator/reconciler/JobReconciler.java | 28 ++++++-
.../operator/reconciler/ReconciliationUtils.java | 76 +++++++++++++++++++
.../operator/reconciler/SessionReconciler.java | 2 +-
.../kubernetes/operator/service/FlinkService.java | 87 ++++++++++++++++++++++
.../kubernetes/operator/utils/SavepointUtils.java | 48 ++++++++++++
.../validation/DefaultDeploymentValidator.java | 3 +-
.../flink/kubernetes/operator/TestUtils.java | 17 -----
.../kubernetes/operator/TestingFlinkService.java | 16 ++++
.../controller/FlinkDeploymentControllerTest.java | 19 ++---
.../kubernetes/operator/observer/ObserverTest.java | 75 ++++++++++++++++++-
.../operator/reconciler/JobReconcilerTest.java | 38 ++++++++--
.../operator/utils/FlinkConfigBuilderTest.java | 3 +-
.../validation/DeploymentValidatorTest.java | 15 ++--
.../crds/flinkdeployments.flink.apache.org-v1.yml | 20 ++++-
26 files changed, 548 insertions(+), 101 deletions(-)
diff --git a/README.md b/README.md
index 2baad7e..ea8c57d 100644
--- a/README.md
+++ b/README.md
@@ -143,3 +143,15 @@ and apply it on your Kubernetes environment:
kubectl create -f pod-monitor.yaml
```
Once the custom resource is created in the Kubernetes environment the operator
metrics are ready to explore
[http://localhost:3000/explore](http://localhost:3000/explore).
+
+# Savepoints
+Savepoints can be triggered manually by defining a random (nonce) value to the
variable `savepointTriggerNonce` in the job specification:
+```yaml
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ upgradeMode: savepoint
+ state: running
+ savepointTriggerNonce: 123
+```
+The operator will trigger a savepoint every time the modified CR is applied
and the nonce is different from the previous value.
\ No newline at end of file
diff --git a/examples/basic-checkpoint-ha.yaml
b/examples/basic-checkpoint-ha.yaml
index 1796bfb..e288048 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -58,3 +58,4 @@ spec:
parallelism: 2
upgradeMode: savepoint
state: running
+ savepointTriggerNonce: 0
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index a579372..7e46b0c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -58,7 +58,7 @@ public class FlinkOperator {
FlinkOperatorConfiguration operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
- Observer observer = new Observer(flinkService);
+ Observer observer = new Observer(flinkService, operatorConfiguration);
FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
ReconcilerFactory factory =
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 00e741d..5a2b5cb 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -20,25 +20,17 @@ package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.configuration.Configuration;
+import lombok.Value;
+
/** Configuration class for operator. */
+@Value
public class FlinkOperatorConfiguration {
- private final int reconcileIntervalInSec;
+ int reconcileIntervalInSec;
- private final int portCheckIntervalInSec;
+ int portCheckIntervalInSec;
- public FlinkOperatorConfiguration(int reconcileIntervalInSec, int
portCheckIntervalInSec) {
- this.reconcileIntervalInSec = reconcileIntervalInSec;
- this.portCheckIntervalInSec = portCheckIntervalInSec;
- }
-
- public int getReconcileIntervalInSec() {
- return reconcileIntervalInSec;
- }
-
- public int getPortCheckIntervalInSec() {
- return portCheckIntervalInSec;
- }
+ int savepointTriggerGracePeriodInSec;
public static FlinkOperatorConfiguration fromConfiguration(Configuration
operatorConfig) {
int reconcileIntervalInSec =
@@ -47,6 +39,13 @@ public class FlinkOperatorConfiguration {
int portCheckIntervalInSec =
operatorConfig.getInteger(
OperatorConfigOptions.OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC);
- return new FlinkOperatorConfiguration(reconcileIntervalInSec,
portCheckIntervalInSec);
+
+ int savepointTriggerGracePeriodInSec =
+ operatorConfig.getInteger(
+ OperatorConfigOptions
+
.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC);
+
+ return new FlinkOperatorConfiguration(
+ reconcileIntervalInSec, portCheckIntervalInSec,
savepointTriggerGracePeriodInSec);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 0a80c07..6d15640 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -38,4 +38,12 @@ public class OperatorConfigOptions {
.withDescription(
"The interval in second for the controller to
reschedule the reconcile process to "
+ "wait for deployment to be ready");
+
+ public static final ConfigOption<Integer>
+ OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC =
+
ConfigOptions.key("operator.observer.savepoint.trigger.grace-period.sec")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "The interval in seconds before a
savepoint trigger attempt is marked as unsuccessful");
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 8417c7f..26c0097 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -21,11 +21,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
@@ -106,7 +106,7 @@ public class FlinkDeploymentController
Optional<String> validationError = validator.validate(flinkApp);
if (validationError.isPresent()) {
LOG.error("Reconciliation failed: " + validationError.get());
- updateForReconciliationError(flinkApp, validationError.get());
+ ReconciliationUtils.updateForReconciliationError(flinkApp,
validationError.get());
return UpdateControl.updateStatus(flinkApp);
}
@@ -125,30 +125,16 @@ public class FlinkDeploymentController
reconcilerFactory
.getOrCreate(flinkApp)
.reconcile(operatorNamespace, flinkApp, context,
effectiveConfig);
- updateForReconciliationSuccess(flinkApp);
return updateControl;
} catch (InvalidDeploymentException ide) {
LOG.error("Reconciliation failed", ide);
- updateForReconciliationError(flinkApp, ide.getMessage());
+ ReconciliationUtils.updateForReconciliationError(flinkApp,
ide.getMessage());
return UpdateControl.updateStatus(flinkApp);
} catch (Exception e) {
throw new ReconciliationException(e);
}
}
- private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
- ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
- reconciliationStatus.setSuccess(true);
- reconciliationStatus.setError(null);
- reconciliationStatus.setLastReconciledSpec(flinkApp.getSpec());
- }
-
- private void updateForReconciliationError(FlinkDeployment flinkApp, String
err) {
- ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
- reconciliationStatus.setSuccess(false);
- reconciliationStatus.setError(err);
- }
-
@Override
public List<EventSource>
prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
Preconditions.checkNotNull(controllerConfig, "Controller config cannot
be null");
@@ -170,7 +156,7 @@ public class FlinkDeploymentController
retryInfo.getAttemptCount(),
retryInfo.isLastAttempt());
- updateForReconciliationError(
+ ReconciliationUtils.updateForReconciliationError(
flinkApp,
(e instanceof ReconciliationException) ?
e.getCause().toString() : e.toString());
return Optional.of(flinkApp);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index 062a5c0..9a687ac 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -36,6 +36,7 @@ public class JobSpec {
private JobState state = JobState.RUNNING;
// The below fields are excluded from equals to avoid triggering job
upgrades on changing these
+ @EqualsAndHashCode.Exclude private long savepointTriggerNonce;
@EqualsAndHashCode.Exclude private String initialSavepointPath;
@EqualsAndHashCode.Exclude private UpgradeMode upgradeMode =
UpgradeMode.STATELESS;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
index 5a8528f..bdb844a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
@@ -32,5 +32,5 @@ public class JobStatus {
private String jobId;
private String state;
private String updateTime;
- private String savepointLocation;
+ private SavepointInfo savepointInfo = new SavepointInfo();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/Savepoint.java
similarity index 78%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/Savepoint.java
index 5a8528f..6201a37 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/Savepoint.java
@@ -18,19 +18,18 @@
package org.apache.flink.kubernetes.operator.crd.status;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-/** Status of an individual job within the Flink deployment. */
+/** Represents information about a finished savepoint. */
@Data
-@NoArgsConstructor
@AllArgsConstructor
-@Builder
-public class JobStatus {
- private String jobName;
- private String jobId;
- private String state;
- private String updateTime;
- private String savepointLocation;
+@NoArgsConstructor
+public class Savepoint {
+ private long timeStamp;
+ private String location;
+
+ public static Savepoint of(String location) {
+ return new Savepoint(System.currentTimeMillis(), location);
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointInfo.java
similarity index 81%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointInfo.java
index 5a8528f..23a892e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointInfo.java
@@ -22,15 +22,13 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-/** Status of an individual job within the Flink deployment. */
+/** Stores savepoint related information. */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
-public class JobStatus {
- private String jobName;
- private String jobId;
- private String state;
- private String updateTime;
- private String savepointLocation;
+public class SavepointInfo {
+ private Savepoint lastSavepoint;
+ private String triggerId;
+ private long triggerTimestamp;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 2772fbb..5bafee9 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -18,10 +18,14 @@
package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -43,9 +47,11 @@ public class Observer {
private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
private final FlinkService flinkService;
+ private final FlinkOperatorConfiguration operatorConfiguration;
- public Observer(FlinkService flinkService) {
+ public Observer(FlinkService flinkService, FlinkOperatorConfiguration
operatorConfiguration) {
this.flinkService = flinkService;
+ this.operatorConfiguration = operatorConfiguration;
}
public boolean observe(
@@ -129,13 +135,51 @@ public class Observer {
}
}
+ private boolean observeSavepointStatus(
+ FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ SavepointInfo savepointInfo =
flinkApp.getStatus().getJobStatus().getSavepointInfo();
+ if (savepointInfo.getTriggerId() == null) {
+ LOG.debug("Checkpointing not in progress");
+ return true;
+ }
+ SavepointFetchResult savepointFetchResult;
+ try {
+ savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp,
effectiveConfig);
+ } catch (Exception e) {
+ LOG.error("Exception while fetching savepoint info", e);
+ return false;
+ }
+
+ if (!savepointFetchResult.isTriggered()) {
+ String error = savepointFetchResult.getError();
+ if (error != null
+ || SavepointUtils.gracePeriodEnded(operatorConfiguration,
savepointInfo)) {
+ String errorMsg = error != null ? error : "Savepoint status
unknown";
+ LOG.error(errorMsg);
+ savepointInfo.setTriggerId(null);
+ ReconciliationUtils.updateForReconciliationError(flinkApp,
errorMsg);
+ return false;
+ }
+ LOG.info("Savepoint operation not running, waiting within grace
period");
+ }
+ if (savepointFetchResult.getSavepoint() == null) {
+ LOG.info("Savepoint not completed yet");
+ return false;
+ }
+
+ savepointInfo.setLastSavepoint(savepointFetchResult.getSavepoint());
+ savepointInfo.setTriggerId(null);
+ return true;
+ }
+
private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
JobManagerDeploymentStatus jmDeploymentStatus =
flinkApp.getStatus().getJobManagerDeploymentStatus();
switch (jmDeploymentStatus) {
case READY:
- return observeFlinkJobStatus(flinkApp, effectiveConfig);
+ return observeFlinkJobStatus(flinkApp, effectiveConfig)
+ && observeSavepointStatus(flinkApp, effectiveConfig);
case MISSING:
return true;
case DEPLOYING:
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
new file mode 100644
index 0000000..6d4ff34
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+
+import lombok.Value;
+
+/** Result of a fetch savepoint operation. */
+@Value
+public class SavepointFetchResult {
+ private final Savepoint savepoint;
+ private final boolean isTriggered;
+ private final String error;
+
+ public static SavepointFetchResult notTriggered() {
+ return new SavepointFetchResult(null, false, null);
+ }
+
+ public static SavepointFetchResult error(String error) {
+ return new SavepointFetchResult(null, false, error);
+ }
+
+ public static SavepointFetchResult pending() {
+ return new SavepointFetchResult(null, true, null);
+ }
+
+ public static SavepointFetchResult completed(Savepoint savepoint) {
+ return new SavepointFetchResult(savepoint, true, null);
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 3d0cb39..6f6b597 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -26,10 +26,12 @@ import
org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -73,6 +75,7 @@ public class JobReconciler extends BaseReconciler {
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
flinkApp, operatorConfiguration);
}
@@ -101,12 +104,20 @@ public class JobReconciler extends BaseReconciler {
restoreFromLastSavepoint(flinkApp, effectiveConfig);
} else if (upgradeMode == UpgradeMode.LAST_STATE) {
final String savepointLocation =
-
flinkApp.getStatus().getJobStatus().getSavepointLocation();
+ flinkApp.getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation();
// Upgrade mode changes from savepoint -> last-state
deployFlinkJob(
flinkApp, effectiveConfig,
Optional.ofNullable(savepointLocation));
}
}
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+ } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp)) {
+ triggerSavepoint(flinkApp, effectiveConfig);
+ ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
}
return UpdateControl.updateStatus(flinkApp)
@@ -136,7 +147,10 @@ public class JobReconciler extends BaseReconciler {
private void restoreFromLastSavepoint(FlinkDeployment flinkApp,
Configuration effectiveConfig)
throws Exception {
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
- deployFlinkJob(flinkApp, effectiveConfig,
Optional.of(jobStatus.getSavepointLocation()));
+ deployFlinkJob(
+ flinkApp,
+ effectiveConfig,
+
Optional.of(jobStatus.getSavepointInfo().getLastSavepoint().getLocation()));
}
private void printCancelLogs(UpgradeMode upgradeMode, String name) {
@@ -166,7 +180,10 @@ public class JobReconciler extends BaseReconciler {
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
jobStatus.setState("suspended");
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- savepointOpt.ifPresent(jobStatus::setSavepointLocation);
+ savepointOpt.ifPresent(
+ location -> {
+
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location));
+ });
return savepointOpt;
}
@@ -189,4 +206,9 @@ public class JobReconciler extends BaseReconciler {
FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
}
+
+ private void triggerSavepoint(FlinkDeployment deployment, Configuration
effectiveConfig)
+ throws Exception {
+ flinkService.triggerSavepoint(deployment, effectiveConfig);
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
new file mode 100644
index 0000000..d429f66
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Reconciliation utilities. */
+public class ReconciliationUtils {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public static void updateForSpecReconciliationSuccess(FlinkDeployment
flinkApp) {
+ ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(true);
+ reconciliationStatus.setError(null);
+ FlinkDeploymentSpec clonedSpec = clone(flinkApp.getSpec());
+ if (reconciliationStatus.getLastReconciledSpec() != null) {
+ long oldSavepointTriggerNonce =
+ reconciliationStatus
+ .getLastReconciledSpec()
+ .getJob()
+ .getSavepointTriggerNonce();
+
clonedSpec.getJob().setSavepointTriggerNonce(oldSavepointTriggerNonce);
+ }
+ reconciliationStatus.setLastReconciledSpec(clonedSpec);
+ }
+
+ public static void updateSavepointReconciliationSuccess(FlinkDeployment
flinkApp) {
+ ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(true);
+ reconciliationStatus.setError(null);
+ reconciliationStatus
+ .getLastReconciledSpec()
+ .getJob()
+
.setSavepointTriggerNonce(flinkApp.getSpec().getJob().getSavepointTriggerNonce());
+ }
+
+ public static void updateForReconciliationError(FlinkDeployment flinkApp,
String err) {
+ ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
+ reconciliationStatus.setSuccess(false);
+ reconciliationStatus.setError(err);
+ }
+
+ public static <T> T clone(T object) {
+ if (object == null) {
+ return null;
+ }
+ try {
+ return (T)
+ objectMapper.readValue(
+ objectMapper.writeValueAsString(object),
object.getClass());
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 7596b4d..a4b64ce 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -64,7 +64,7 @@ public class SessionReconciler extends BaseReconciler {
if (specChanged) {
upgradeSessionCluster(flinkApp, effectiveConfig);
}
-
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
return UpdateControl.updateStatus(flinkApp)
.rescheduleAfter(
operatorConfiguration.getReconcileIntervalInSec(),
TimeUnit.SECONDS);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 9822d90..140baea 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -35,9 +35,21 @@ import
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorato
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.slf4j.Logger;
@@ -51,6 +63,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Collection;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/** Service for submitting and interacting with Flink clusters and jobs. */
@@ -168,4 +181,78 @@ public class FlinkService {
FlinkUtils.deleteCluster(deployment, kubernetesClient, deleteHaData);
FlinkUtils.waitForClusterShutdown(kubernetesClient, conf);
}
+
+ public void triggerSavepoint(FlinkDeployment deployment, Configuration
conf) throws Exception {
+ LOG.info("Triggering savepoint on " +
deployment.getMetadata().getName());
+ try (RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ SavepointTriggerHeaders savepointTriggerHeaders =
SavepointTriggerHeaders.getInstance();
+ SavepointTriggerMessageParameters
savepointTriggerMessageParameters =
+ savepointTriggerHeaders.getUnresolvedMessageParameters();
+ savepointTriggerMessageParameters.jobID.resolve(
+
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()));
+
+ TriggerResponse response =
+ clusterClient
+ .sendRequest(
+ savepointTriggerHeaders,
+ savepointTriggerMessageParameters,
+ new SavepointTriggerRequestBody(null,
false))
+ .get();
+ LOG.info("Savepoint triggered: " +
response.getTriggerId().toHexString());
+
+ org.apache.flink.kubernetes.operator.crd.status.SavepointInfo
savepointInfo =
+ deployment.getStatus().getJobStatus().getSavepointInfo();
+
+ savepointInfo.setTriggerId(response.getTriggerId().toHexString());
+ savepointInfo.setTriggerTimestamp(System.currentTimeMillis());
+ }
+ }
+
+ public SavepointFetchResult fetchSavepointInfo(FlinkDeployment deployment,
Configuration conf)
+ throws Exception {
+ LOG.info(
+ "Fetching savepoint result with triggerId: "
+ +
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+ try (RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ SavepointStatusHeaders savepointStatusHeaders =
SavepointStatusHeaders.getInstance();
+ SavepointStatusMessageParameters savepointStatusMessageParameters =
+ savepointStatusHeaders.getUnresolvedMessageParameters();
+ savepointStatusMessageParameters.jobIdPathParameter.resolve(
+
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()));
+ savepointStatusMessageParameters.triggerIdPathParameter.resolve(
+ TriggerId.fromHexString(
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getTriggerId()));
+ CompletableFuture<AsynchronousOperationResult<SavepointInfo>>
response =
+ clusterClient.sendRequest(
+ savepointStatusHeaders,
+ savepointStatusMessageParameters,
+ EmptyRequestBody.getInstance());
+
+ if (response.get() == null || response.get().resource() == null) {
+ return SavepointFetchResult.notTriggered();
+ }
+
+ if (response.get().resource().getLocation() == null) {
+ if (response.get().resource().getFailureCause() != null) {
+ LOG.error("Savepoint error",
response.get().resource().getFailureCause());
+ return SavepointFetchResult.error(
+
response.get().resource().getFailureCause().getMessage());
+ } else {
+ return SavepointFetchResult.pending();
+ }
+ }
+
+ Savepoint savepoint =
+ new Savepoint(
+ System.currentTimeMillis(),
response.get().resource().getLocation());
+ LOG.info("Savepoint result: " + savepoint);
+ return SavepointFetchResult.completed(savepoint);
+ }
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
new file mode 100644
index 0000000..9366472
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+
+import java.util.concurrent.TimeUnit;
+
+/** Savepoint utilities. */
+public class SavepointUtils {
+ public static boolean shouldTriggerSavepoint(FlinkDeployment
flinkDeployment) {
+ if
(flinkDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId()
!= null) {
+ return false;
+ }
+ return flinkDeployment.getSpec().getJob().getSavepointTriggerNonce()
+ != flinkDeployment
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .getSavepointTriggerNonce();
+ }
+
+ public static boolean gracePeriodEnded(
+ FlinkOperatorConfiguration configuration, SavepointInfo
savepointInfo) {
+ int gracePeriod = configuration.getSavepointTriggerGracePeriodInSec();
+ long triggerTimestamp = savepointInfo.getTriggerTimestamp();
+ return (System.currentTimeMillis() - triggerTimestamp)
+ > TimeUnit.SECONDS.toMillis(gracePeriod);
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index aaef01f..4db37c6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -198,7 +198,8 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
if (oldJob.getState() == JobState.SUSPENDED
&& newJob.getState() == JobState.RUNNING
&& newJob.getUpgradeMode() == UpgradeMode.SAVEPOINT
- &&
deployment.getStatus().getJobStatus().getSavepointLocation() == null) {
+ &&
(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint()
+ == null)) {
return Optional.of("Cannot perform savepoint restore without a
valid savepoint");
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 055d91b..009b17a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -27,8 +27,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
@@ -43,8 +41,6 @@ import java.util.Optional;
/** Testing utilities. */
public class TestUtils {
- private static final ObjectMapper objectMapper = new ObjectMapper();
-
public static final String TEST_NAMESPACE = "flink-operator-test";
public static final String SERVICE_ACCOUNT = "flink-operator";
public static final String FLINK_VERSION = "latest";
@@ -77,19 +73,6 @@ public class TestUtils {
return deployment;
}
- public static <T> T clone(T object) {
- if (object == null) {
- return null;
- }
- try {
- return (T)
- objectMapper.readValue(
- objectMapper.writeValueAsString(object),
object.getClass());
- } catch (JsonProcessingException e) {
- throw new IllegalStateException(e);
- }
- }
-
public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
return FlinkDeploymentSpec.builder()
.image(IMAGE)
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 2aab5ab..5da2ec5 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -104,6 +107,19 @@ public class TestingFlinkService extends FlinkService {
}
@Override
+ public void triggerSavepoint(FlinkDeployment deployment, Configuration
conf) throws Exception {
+ SavepointInfo savepointInfo =
deployment.getStatus().getJobStatus().getSavepointInfo();
+ savepointInfo.setTriggerId("trigger_" + savepointCounter);
+ savepointInfo.setTriggerTimestamp(System.currentTimeMillis());
+ }
+
+ @Override
+ public SavepointFetchResult fetchSavepointInfo(FlinkDeployment deployment,
Configuration conf)
+ throws Exception {
+ return SavepointFetchResult.completed(Savepoint.of("savepoint_" +
savepointCounter++));
+ }
+
+ @Override
public boolean isJobManagerPortReady(Configuration config) {
return isPortReady;
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 60632ec..55d1628 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -31,6 +31,7 @@ import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -131,7 +132,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
// Send in invalid update
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().setJob(null);
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
@@ -162,7 +163,7 @@ public class FlinkDeploymentControllerTest {
assertEquals("s0", jobs.get(0).f0);
List<Tuple2<String, JobStatusMessage>> previousJobs = new
ArrayList<>(jobs);
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setInitialSavepointPath("s1");
// Send in a no-op change
@@ -170,7 +171,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
// Upgrade job
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setParallelism(100);
testController.reconcile(appCluster, context);
@@ -180,7 +181,7 @@ public class FlinkDeploymentControllerTest {
testController.reconcile(appCluster, context);
// Suspend job
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
testController.reconcile(appCluster, context);
assertEquals(
@@ -188,7 +189,7 @@ public class FlinkDeploymentControllerTest {
appCluster.getStatus().getJobManagerDeploymentStatus());
// Resume from last savepoint
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setState(JobState.RUNNING);
testController.reconcile(appCluster, TestUtils.createEmptyContext());
jobs = flinkService.listJobs();
@@ -213,7 +214,7 @@ public class FlinkDeploymentControllerTest {
assertEquals("s0", jobs.get(0).f0);
// Upgrade job
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setParallelism(100);
UpdateControl<FlinkDeployment> updateControl =
@@ -229,12 +230,12 @@ public class FlinkDeploymentControllerTest {
assertEquals(null, jobs.get(0).f0);
// Suspend job
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
testController.reconcile(appCluster, context);
// Resume from empty state
- appCluster = TestUtils.clone(appCluster);
+ appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setState(JobState.RUNNING);
testController.reconcile(appCluster, context);
jobs = flinkService.listJobs();
@@ -277,7 +278,7 @@ public class FlinkDeploymentControllerTest {
private FlinkDeploymentController createTestController(
KubernetesClient kubernetesClient, TestingFlinkService
flinkService) {
- Observer observer = new Observer(flinkService);
+ Observer observer = new Observer(flinkService, operatorConfiguration);
FlinkDeploymentController controller =
new FlinkDeploymentController(
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
index 4e7615c..c5d73d4 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -32,6 +34,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link Observer unit tests */
@@ -39,11 +42,16 @@ public class ObserverTest {
private final Context readyContext =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+ private final FlinkOperatorConfiguration operatorConfiguration =
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration());
@Test
public void observeSessionCluster() {
FlinkService flinkService = new TestingFlinkService();
- Observer observer = new Observer(flinkService);
+ Observer observer =
+ new Observer(
+ flinkService,
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
FlinkDeployment deployment = TestUtils.buildSessionCluster();
deployment
.getStatus()
@@ -74,7 +82,10 @@ public class ObserverTest {
@Test
public void observeApplicationCluster() {
TestingFlinkService flinkService = new TestingFlinkService();
- Observer observer = new Observer(flinkService);
+ Observer observer =
+ new Observer(
+ flinkService,
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
@@ -132,4 +143,64 @@ public class ObserverTest {
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals("UNKNOWN",
deployment.getStatus().getJobStatus().getState());
}
+
+ @Test
+ public void observeSavepoint() throws Exception {
+ TestingFlinkService flinkService = new TestingFlinkService();
+ Observer observer =
+ new Observer(
+ flinkService,
+ FlinkOperatorConfiguration.fromConfiguration(new
Configuration()));
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+ flinkService.submitApplicationCluster(deployment, conf);
+ bringToReadyStatus(deployment);
+ assertTrue(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ flinkService.triggerSavepoint(deployment, conf);
+
+ assertEquals(
+ "trigger_0",
+
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+ assertTrue(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ "savepoint_0",
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation());
+
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+
+ flinkService.triggerSavepoint(deployment, conf);
+ assertEquals(
+ "trigger_1",
+
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+ assertTrue(observer.observe(deployment, readyContext, conf));
+ assertEquals(
+ "savepoint_1",
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation());
+
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+ }
+
+ private void bringToReadyStatus(FlinkDeployment deployment) {
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobName("jobname");
+ jobStatus.setJobId("0000000000");
+ jobStatus.setState("RUNNING");
+ deployment.getStatus().setJobStatus(jobStatus);
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index fd940d4..cc7f508 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -26,7 +26,6 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -88,7 +87,7 @@ public class JobReconcilerTest {
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
// Test stateless upgrade
- FlinkDeployment statelessUpgrade = TestUtils.clone(deployment);
+ FlinkDeployment statelessUpgrade =
ReconciliationUtils.clone(deployment);
statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
reconciler.reconcile("test", statelessUpgrade, context, config);
@@ -103,7 +102,7 @@ public class JobReconcilerTest {
.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
// Test stateful upgrade
- FlinkDeployment statefulUpgrade = TestUtils.clone(deployment);
+ FlinkDeployment statefulUpgrade =
ReconciliationUtils.clone(deployment);
statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2");
@@ -119,7 +118,6 @@ public class JobReconcilerTest {
final String expectedSavepointPath = "savepoint_0";
final Context context =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
final TestingFlinkService flinkService = new TestingFlinkService();
- Observer observer = new Observer(flinkService);
final JobReconciler reconciler =
new JobReconciler(null, flinkService, operatorConfiguration);
@@ -143,7 +141,12 @@ public class JobReconcilerTest {
.equalsIgnoreCase(deployment.getStatus().getJobStatus().getState()));
assertEquals(
expectedSavepointPath,
- deployment.getStatus().getJobStatus().getSavepointLocation());
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation());
// Resume FlinkDeployment with last-state upgrade mode
deployment
@@ -163,6 +166,29 @@ public class JobReconcilerTest {
assertEquals(expectedSavepointPath, runningJobs.get(0).f0);
}
+ @Test
+ public void triggerSavepoint() throws Exception {
+ Context context =
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+ TestingFlinkService flinkService = new TestingFlinkService();
+ JobReconciler reconciler = new JobReconciler(null, flinkService,
operatorConfiguration);
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ Configuration config = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+
+ reconciler.reconcile("test", deployment, context, config);
+ List<Tuple2<String, JobStatusMessage>> runningJobs =
flinkService.listJobs();
+ verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+
+ // trigger savepoint
+ FlinkDeployment spDeployment = ReconciliationUtils.clone(deployment);
+ long oldValue =
spDeployment.getSpec().getJob().getSavepointTriggerNonce();
+ spDeployment.getSpec().getJob().setSavepointTriggerNonce(oldValue + 1);
+ reconciler.reconcile("test", spDeployment, context, config);
+ assertEquals(
+ "trigger_0",
+
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+ }
+
private void verifyAndSetRunningJobsToStatus(
FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>>
runningJobs) {
assertEquals(1, runningJobs.size());
@@ -170,7 +196,7 @@ public class JobReconcilerTest {
deployment
.getStatus()
.getReconciliationStatus()
- .setLastReconciledSpec(TestUtils.clone(deployment.getSpec()));
+
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
JobStatus jobStatus = new JobStatus();
jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index fd93e7d..8d6aee5 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -29,6 +29,7 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -193,7 +194,7 @@ public class FlinkConfigBuilderTest {
@Test
public void testApplyTaskManagerSpec() throws Exception {
- FlinkDeployment deploymentClone = TestUtils.clone(flinkDeployment);
+ FlinkDeployment deploymentClone =
ReconciliationUtils.clone(flinkDeployment);
deploymentClone.getSpec().setPodTemplate(null);
final Configuration configuration =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 184d3aa..bfeefb6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -25,6 +25,8 @@ import
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.junit.Assert;
@@ -120,12 +122,15 @@ public class DeploymentValidatorTest {
dep -> {
dep.setStatus(new FlinkDeploymentStatus());
dep.getStatus().setJobStatus(new JobStatus());
- dep.getStatus().getJobStatus().setSavepointLocation("sp");
+ dep.getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .setLastSavepoint(Savepoint.of("sp"));
dep.getStatus().setReconciliationStatus(new
ReconciliationStatus());
dep.getStatus()
.getReconciliationStatus()
-
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
dep.getStatus()
.getReconciliationStatus()
.getLastReconciledSpec()
@@ -143,7 +148,7 @@ public class DeploymentValidatorTest {
dep.getStatus().setReconciliationStatus(new
ReconciliationStatus());
dep.getStatus()
.getReconciliationStatus()
-
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
dep.getStatus()
.getReconciliationStatus()
.getLastReconciledSpec()
@@ -163,7 +168,7 @@ public class DeploymentValidatorTest {
dep.getStatus().setReconciliationStatus(new
ReconciliationStatus());
dep.getStatus()
.getReconciliationStatus()
-
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
dep.getSpec().setJob(null);
},
"Cannot switch from job to session cluster");
@@ -176,7 +181,7 @@ public class DeploymentValidatorTest {
dep.getStatus().setReconciliationStatus(new
ReconciliationStatus());
dep.getStatus()
.getReconciliationStatus()
-
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
dep.getStatus().getReconciliationStatus().getLastReconciledSpec().setJob(null);
},
"Cannot switch from session to job cluster");
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index ec9ee63..078c0ed 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9048,6 +9048,8 @@ spec:
- running
- suspended
type: string
+ savepointTriggerNonce:
+ type: integer
initialSavepointPath:
type: string
upgradeMode:
@@ -9074,8 +9076,20 @@ spec:
type: string
updateTime:
type: string
- savepointLocation:
- type: string
+ savepointInfo:
+ properties:
+ lastSavepoint:
+ properties:
+ timeStamp:
+ type: integer
+ location:
+ type: string
+ type: object
+ triggerId:
+ type: string
+ triggerTimestamp:
+ type: integer
+ type: object
type: object
jobManagerDeploymentStatus:
enum:
@@ -18121,6 +18135,8 @@ spec:
- running
- suspended
type: string
+ savepointTriggerNonce:
+ type: integer
initialSavepointPath:
type: string
upgradeMode: