gyfora commented on code in PR #821:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667372364
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -222,9 +235,12 @@ private void updateStatusBeforeFirstDeployment(
CR cr, SPEC spec, Configuration deployConfig, STATUS status,
KubernetesClient client) {
if (spec.getJob() != null) {
var initialUpgradeMode = UpgradeMode.STATELESS;
+ var snapshotRef = spec.getJob().getFlinkStateSnapshotReference();
var initialSp = spec.getJob().getInitialSavepointPath();
- if (initialSp != null) {
+ if (snapshotRef != null) {
+ initialUpgradeMode = UpgradeMode.SAVEPOINT;
+ } else if (initialSp != null) {
status.getJobStatus()
.getSavepointInfo()
.setLastSavepoint(Savepoint.of(initialSp,
SnapshotTriggerType.UNKNOWN));
Review Comment:
shouldn't the last savepoint info be updated in case the snapshot ref is
provided? This logic is there to handle failures before the initial startup so
should apply to both
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.snapshot;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/** The reconciler for the {@link
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotReconciler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StateSnapshotReconciler.class);
+
+ private final FlinkResourceContextFactory ctxFactory;
+
+ public void reconcile(FlinkStateSnapshotContext ctx) throws Exception {
+ var source = ctx.getResource().getSpec().getJobReference();
+ var resource = ctx.getResource();
+
+ var savepointState = resource.getStatus().getState();
+ if (!FlinkStateSnapshotState.TRIGGER_PENDING.equals(savepointState)) {
+ return;
+ }
+
+ if (resource.getSpec().isSavepoint()
+ && resource.getSpec().getSavepoint().getAlreadyExists()) {
+ LOG.info(
+ "Snapshot {} is marked as completed in spec, skipping
triggering savepoint.",
+ resource.getMetadata().getName());
+ resource.getStatus().setState(FlinkStateSnapshotState.COMPLETED);
+
resource.getStatus().setPath(resource.getSpec().getSavepoint().getPath());
+ var time = DateTimeUtils.kubernetes(Instant.now());
+ resource.getStatus().setTriggerTimestamp(time);
+ resource.getStatus().setResultTimestamp(time);
+ return;
+ }
+
+ var secondaryResource =
+ ctx.getSecondaryResource()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ String.format(
+ "Secondary resource %s
not found",
+ source)));
Review Comment:
Should this just abandon the snapshot?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -329,18 +459,26 @@ private void redeployWithSavepoint(
throws Exception {
LOG.info("Redeploying from savepoint");
cancelJob(ctx, UpgradeMode.STATELESS);
- var savepoint = currentDeploySpec.getJob().getInitialSavepointPath();
+ var snapshotRef =
currentDeploySpec.getJob().getFlinkStateSnapshotReference();
currentDeploySpec.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+ var path = currentDeploySpec.getJob().getInitialSavepointPath();
+ if (snapshotRef != null && snapshotRef.getName() != null) {
+ path =
+
FlinkStateSnapshotUtils.getAndValidateFlinkStateSnapshotPath(
+ ctx.getKubernetesClient(), snapshotRef);
+ }
+
status.getJobStatus()
.getSavepointInfo()
- .setLastSavepoint(Savepoint.of(savepoint,
SnapshotTriggerType.UNKNOWN));
+ .setLastSavepoint(Savepoint.of(path,
SnapshotTriggerType.UNKNOWN));
Review Comment:
See my other long comment
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -266,19 +304,31 @@ protected void restoreJob(
Optional<String> savepointOpt = Optional.empty();
if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
- savepointOpt =
- Optional.ofNullable(
- ctx.getResource()
- .getStatus()
- .getJobStatus()
- .getSavepointInfo()
- .getLastSavepoint())
- .flatMap(s ->
Optional.ofNullable(s.getLocation()));
+ if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource(
+ ctx.getOperatorConfig(), deployConfig)) {
+ savepointOpt =
getLatestSavepointPathFromFlinkStateSnapshots(ctx);
+ } else {
+ savepointOpt =
+ Optional.ofNullable(
+ ctx.getResource()
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint())
+ .flatMap(s ->
Optional.ofNullable(s.getLocation()));
Review Comment:
The problem with this logic is that the snapshotReference field allows us to
specify the path directly. So if the user starts the job using that, in the
code we don't update the lastSavepointInfo but we also won't find that path
here. So we will simply restart from empty state in those cases which is not
what should happen.
2 main ways to solve it:
1. We keep the current lastSavepoint logic in the status and update it
whenever a snapshot is completed. This is a bit problematic because we would
have to update the status of the FlinkDeployment/SessionJob from the snapshot
controller, which is a bit weird.
2. Introduce a new field in the status to capture the savepoint reference
used during a Savepoint deployment. I think this is the way to go because this
is the functionality that is important to capture. When we submit with
initialSavepointPath the first time / savepoint redeploy / savepoint upgrade,
in the lastReconciledSpec we have upgradeMode: SAVEPOINT recorded and it is
assumed that the lastSavepoint contains the savepoint used. Now we are getting
rid of the lastSavepoint so we should put something more meaningful in place in
the status to capture the actual upgrade related behaviour. One possibility
here is simply to use the `job.snapshotReference` field in the
lastReconciledSpec to capture the upgrade savepoint and clear it in other cases
(same way of how we reuse the upgradeMode field to capture how we actually
stopped the job in the lastReconciledSpec)
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -226,7 +230,36 @@ private void setJobIdIfNecessary(
@Override
protected void cancelJob(FlinkResourceContext<FlinkDeployment> ctx,
UpgradeMode upgradeMode)
throws Exception {
- ctx.getFlinkService().cancelJob(ctx.getResource(), upgradeMode,
ctx.getObserveConfig());
+ var conf = ctx.getObserveConfig() != null ? ctx.getObserveConfig() :
new Configuration();
+ var savepointFormatType =
+
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
+
+ var savepointOpt = ctx.getFlinkService().cancelJob(ctx.getResource(),
upgradeMode, conf);
+ savepointOpt.ifPresent(
+ location -> {
+ if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource(
+ ctx.getOperatorConfig(), conf)) {
+ FlinkStateSnapshotUtils.createUpgradeSavepointResource(
+ ctx.getKubernetesClient(),
+ ctx.getResource(),
+ location,
+
SavepointFormatType.valueOf(savepointFormatType.name()),
+ conf.get(
+ KubernetesOperatorConfigOptions
+
.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE));
+ } else {
+ Savepoint sp =
+ Savepoint.of(
+ location,
+ SnapshotTriggerType.UPGRADE,
+
SavepointFormatType.valueOf(savepointFormatType.name()));
+ ctx.getResource()
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .updateLastSavepoint(sp);
Review Comment:
see my other long comment
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/snapshot/StateSnapshotObserver.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer.snapshot;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+
+/** The observer of {@link
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotObserver {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StateSnapshotObserver.class);
+
+ private final FlinkResourceContextFactory ctxFactory;
+ private final EventRecorder eventRecorder;
+
+ public void observe(FlinkStateSnapshotContext ctx) {
+ var resource = ctx.getResource();
+ var savepointState = resource.getStatus().getState();
+
+ if (FlinkStateSnapshotState.IN_PROGRESS.equals(savepointState)) {
+ observeSavepointState(ctx);
+ }
+ }
+
+ private void observeSavepointState(FlinkStateSnapshotContext ctx) {
+ var resource = ctx.getResource();
+ var resourceName = resource.getMetadata().getName();
+ var triggerId = resource.getStatus().getTriggerId();
+
+ if (StringUtils.isEmpty(triggerId)) {
+ return;
+ }
+
+ LOG.debug("Observing savepoint state for resource {}...",
resourceName);
+ var secondaryResource =
+ ctx.getSecondaryResource()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ String.format(
+ "Secondary resource %s
for savepoint %s was not found",
+
resource.getSpec().getJobReference(),
+ resourceName)));
Review Comment:
If the target is not found anymore to me that sounds similar to the not
running case, and probably should be marked abandoned. What is your thinking
here?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.snapshot;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/** The reconciler for the {@link
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotReconciler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StateSnapshotReconciler.class);
+
+ private final FlinkResourceContextFactory ctxFactory;
+
+ public void reconcile(FlinkStateSnapshotContext ctx) throws Exception {
+ var source = ctx.getResource().getSpec().getJobReference();
+ var resource = ctx.getResource();
+
+ var savepointState = resource.getStatus().getState();
+ if (!FlinkStateSnapshotState.TRIGGER_PENDING.equals(savepointState)) {
+ return;
+ }
+
+ if (resource.getSpec().isSavepoint()
+ && resource.getSpec().getSavepoint().getAlreadyExists()) {
+ LOG.info(
+ "Snapshot {} is marked as completed in spec, skipping
triggering savepoint.",
+ resource.getMetadata().getName());
+ resource.getStatus().setState(FlinkStateSnapshotState.COMPLETED);
+
resource.getStatus().setPath(resource.getSpec().getSavepoint().getPath());
+ var time = DateTimeUtils.kubernetes(Instant.now());
+ resource.getStatus().setTriggerTimestamp(time);
+ resource.getStatus().setResultTimestamp(time);
+ return;
+ }
+
+ var secondaryResource =
+ ctx.getSecondaryResource()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ String.format(
+ "Secondary resource %s
not found",
+ source)));
+ if (!ReconciliationUtils.isJobRunning(secondaryResource.getStatus())) {
+ LOG.warn(
+ "Target job {} for savepoint {} is not running, cannot
trigger snapshot.",
+ secondaryResource.getMetadata().getName(),
+ resource.getMetadata().getName());
+ return;
+ }
Review Comment:
we have already observed and abandoned the snapshot in this case right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]