ferenc-csaky commented on code in PR #821:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1687799832
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Max allowed checkpoint age for initiating
last-state upgrades on running jobs. If a checkpoint is not available within
the desired age (and nothing in progress) a savepoint will be triggered.");
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Boolean>
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
+ operatorConfig("savepoint.dispose-on-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Savepoint data for FlinkStateSnapshot resources
created by the operator during upgrades and periodic savepoints will be
disposed of automatically when the generated Kubernetes resource is deleted.");
Review Comment:
nit: "...savepoints will be disposed ~of~ automatically when..."
##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Describes current snapshot state. */
+@Experimental
+public enum FlinkStateSnapshotState {
Review Comment:
Maybe move this inside the `FlinkStateSnapshotStatus`, and then the enum
name could be simply `State`. This might improve the overall readability and we
can work around the kind of confusing wording here a bit better.
I see that `ReconciliationStatus` and `ReconciliationState` follows the
current setup and this just setup the same way, but the wording is
straightforward there.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -297,6 +297,18 @@ public static String operatorConfigKey(String key) {
"Custom HTTP header for HttpArtifactFetcher. The
header will be applied when getting the session job artifacts. "
+ "Expected format:
headerKey1:headerValue1,headerKey2:headerValue2.");
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Boolean> SNAPSHOT_RESOURCE_ENABLED =
+ operatorConfig("snapshot.resource.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Create new FlinkStateSnapshot resources for
storing snapshots. "
+ + "Disable if you wish to use the
deprecated mode and save snapshot results to "
+ + "FlinkDeployment/FlinkSessionJob status
fields. The Operator will fallback to the "
Review Comment:
nit: "The Operator will fallback to ~the~"
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import
org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
+import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Controller that runs the main reconcile loop for {@link
FlinkStateSnapshot}. */
+@ControllerConfiguration
+public class FlinkStateSnapshotController
+ implements Reconciler<FlinkStateSnapshot>,
+ ErrorStatusHandler<FlinkStateSnapshot>,
+ EventSourceInitializer<FlinkStateSnapshot>,
+ Cleaner<FlinkStateSnapshot> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkStateSnapshotController.class);
+
+ private final Set<FlinkResourceValidator> validators;
+ private final FlinkResourceContextFactory ctxFactory;
+ private final StateSnapshotReconciler reconciler;
+ private final StateSnapshotObserver observer;
+ private final EventRecorder eventRecorder;
+ private final StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus>
statusRecorder;
+
+ public FlinkStateSnapshotController(
+ Set<FlinkResourceValidator> validators,
+ FlinkResourceContextFactory ctxFactory,
+ StateSnapshotReconciler reconciler,
+ StateSnapshotObserver observer,
+ EventRecorder eventRecorder,
+ StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus>
statusRecorder) {
+ this.validators = validators;
+ this.ctxFactory = ctxFactory;
+ this.reconciler = reconciler;
+ this.observer = observer;
+ this.eventRecorder = eventRecorder;
+ this.statusRecorder = statusRecorder;
+ }
+
+ @Override
+ public UpdateControl<FlinkStateSnapshot> reconcile(
+ FlinkStateSnapshot flinkStateSnapshot, Context<FlinkStateSnapshot>
josdkContext) {
+ // status might be null here
+ flinkStateSnapshot.setStatus(
+ Objects.requireNonNullElseGet(
+ flinkStateSnapshot.getStatus(),
FlinkStateSnapshotStatus::new));
+ var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot,
josdkContext);
+
+ observer.observe(ctx);
+
+ if (validateSnapshot(ctx)) {
+ reconciler.reconcile(ctx);
+ }
+
+ notifyListeners(ctx);
+ return getUpdateControl(ctx);
+ }
+
+ @Override
+ public DeleteControl cleanup(
+ FlinkStateSnapshot flinkStateSnapshot, Context<FlinkStateSnapshot>
josdkContext) {
+ var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot,
josdkContext);
+ DeleteControl deleteControl;
Review Comment:
This local variable is unnecessary. We could return inside the `try`, as the
`catch` has its own return as well.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+ private final FlinkStateSnapshot resource;
+ private final FlinkStateSnapshotStatus originalStatus;
+ private final Context<FlinkStateSnapshot> josdkContext;
+ private final FlinkConfigManager configManager;
+
+ private FlinkOperatorConfiguration operatorConfig;
+ private Configuration referencedJobObserveConfig;
+ private FlinkDeployment referencedJobFlinkDeployment;
+
+ /**
+ * @return Operator configuration for this resource.
+ */
+ public FlinkOperatorConfiguration getOperatorConfig() {
+ if (operatorConfig != null) {
+ return operatorConfig;
+ }
+ return operatorConfig =
+ configManager.getOperatorConfiguration(
+ getResource().getMetadata().getNamespace(), null);
+ }
+
+ public Optional<AbstractFlinkResource<?, ?>> getSecondaryResource() {
+ var jobRef = getResource().getSpec().getJobReference();
+ if (jobRef == null) {
+ return Optional.empty();
+ }
+
+ if (JobKind.FLINK_DEPLOYMENT.equals(jobRef.getKind())) {
+ return
getJosdkContext().getSecondaryResource(FlinkDeployment.class).map(r -> r);
+ } else if (JobKind.FLINK_SESSION_JOB.equals(jobRef.getKind())) {
+ return
getJosdkContext().getSecondaryResource(FlinkSessionJob.class).map(r -> r);
+ } else {
+ return Optional.empty();
+ }
Review Comment:
nit: We can eliminate the first null-check, so the flow will be more
straightforward, but getting the `jobKind` is a bit more complex:
```java
var jobKind =
Optional.ofNullable(getResource().getSpec().getJobReference())
.map(JobReference::getKind)
.orElse(null);
if (JobKind.FLINK_DEPLOYMENT.equals(jobKind)) {
return
getJosdkContext().getSecondaryResource(FlinkDeployment.class).map(r -> r);
}
if (JobKind.FLINK_SESSION_JOB.equals(jobKind)) {
return
getJosdkContext().getSecondaryResource(FlinkSessionJob.class).map(r -> r);
}
return Optional.empty();
```
##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java:
##########
@@ -35,7 +35,10 @@
@AllArgsConstructor
@Builder
public class SavepointInfo implements SnapshotInfo {
- /** Last completed savepoint by the operator. */
+ /**
+ * Last completed savepoint by the operator for manual and periodic
snapshots. Only used if
Review Comment:
Should not this be marked as `@Deprecated` as well?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+ private final FlinkStateSnapshot resource;
+ private final FlinkStateSnapshotStatus originalStatus;
+ private final Context<FlinkStateSnapshot> josdkContext;
+ private final FlinkConfigManager configManager;
+
+ private FlinkOperatorConfiguration operatorConfig;
+ private Configuration referencedJobObserveConfig;
+ private FlinkDeployment referencedJobFlinkDeployment;
+
+ /**
+ * @return Operator configuration for this resource.
+ */
+ public FlinkOperatorConfiguration getOperatorConfig() {
+ if (operatorConfig != null) {
+ return operatorConfig;
+ }
+ return operatorConfig =
+ configManager.getOperatorConfiguration(
+ getResource().getMetadata().getNamespace(), null);
Review Comment:
IMO inverting the current `if` condition creates a more natural and readable
flow with 1 `return`, so i would consider that as an improvement.
But Lombok's [lazy getter](https://projectlombok.org/features/GetterLazy)
would be even better to use for all currently non-final fields, which could be
`final` in that case an we can eliminate the null-checks from the src.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import
org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
+import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Controller that runs the main reconcile loop for {@link
FlinkStateSnapshot}. */
+@ControllerConfiguration
+public class FlinkStateSnapshotController
+ implements Reconciler<FlinkStateSnapshot>,
+ ErrorStatusHandler<FlinkStateSnapshot>,
+ EventSourceInitializer<FlinkStateSnapshot>,
+ Cleaner<FlinkStateSnapshot> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkStateSnapshotController.class);
+
+ private final Set<FlinkResourceValidator> validators;
+ private final FlinkResourceContextFactory ctxFactory;
+ private final StateSnapshotReconciler reconciler;
+ private final StateSnapshotObserver observer;
+ private final EventRecorder eventRecorder;
+ private final StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus>
statusRecorder;
+
+ public FlinkStateSnapshotController(
Review Comment:
Ctor can be replaced with `@RequiredArgsConstructor`.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -226,7 +229,27 @@ private void setJobIdIfNecessary(
@Override
protected void cancelJob(FlinkResourceContext<FlinkDeployment> ctx,
UpgradeMode upgradeMode)
throws Exception {
- ctx.getFlinkService().cancelJob(ctx.getResource(), upgradeMode,
ctx.getObserveConfig());
+ var conf = ctx.getObserveConfig() != null ? ctx.getObserveConfig() :
new Configuration();
+ var savepointFormatType =
+
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
+
+ var savepointOpt = ctx.getFlinkService().cancelJob(ctx.getResource(),
upgradeMode, conf);
+ savepointOpt.ifPresent(
Review Comment:
The body of this optional could be moved to `AbstractJobReconciler` as a new
protected method, so we only duplicate as minimal logic as we can.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##########
@@ -273,15 +280,19 @@ private void disposeSavepointQuietly(
}
}
- private void observeLatestSavepoint(
+ private void observeLatestCheckpoint(
Review Comment:
Is this always a checkpoint? Shouldn't this be more like
`observeLatestSnapshot`? Also, we should update the error log at L297.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/snapshot/StateSnapshotObserver.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer.snapshot;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The observer of {@link
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotObserver {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StateSnapshotObserver.class);
+
+ private final FlinkResourceContextFactory ctxFactory;
+ private final EventRecorder eventRecorder;
+
+ public void observe(FlinkStateSnapshotContext ctx) {
+ var resource = ctx.getResource();
+ var savepointState = resource.getStatus().getState();
+
+ if (FlinkStateSnapshotState.IN_PROGRESS.equals(savepointState)) {
+ observeSnapshotState(ctx);
+ }
+ }
+
+ private void observeSnapshotState(FlinkStateSnapshotContext ctx) {
+ var resource = ctx.getResource();
+ var resourceName = resource.getMetadata().getName();
+ var triggerId = resource.getStatus().getTriggerId();
+
+ if (StringUtils.isEmpty(triggerId)) {
+ return;
+ }
+
+ LOG.debug("Observing snapshot state for resource {}...", resourceName);
+
+ if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
+ ctx.getKubernetesClient(),
+ ctx.getResource(),
+ ctx.getSecondaryResource().orElse(null),
+ eventRecorder)) {
+ return;
+ }
+
+ var jobId =
ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId();
+ var ctxFlinkDeployment =
+ ctxFactory.getResourceContext(
+ ctx.getReferencedJobFlinkDeployment(),
ctx.getJosdkContext());
+ var observeConfig = ctx.getReferencedJobObserveConfig();
+
+ if (resource.getSpec().isSavepoint()) {
+ var savepointInfo =
+ ctxFlinkDeployment
+ .getFlinkService()
+ .fetchSavepointInfo(triggerId, jobId,
observeConfig);
+ handleSavepoint(ctx, savepointInfo);
+ } else {
+ var checkpointInfo =
+ ctxFlinkDeployment
+ .getFlinkService()
+ .fetchCheckpointInfo(triggerId, jobId,
observeConfig);
+ handleCheckpoint(ctx, checkpointInfo, ctxFlinkDeployment, jobId);
+ }
+ }
+
+ private void handleSavepoint(
+ FlinkStateSnapshotContext ctx, SavepointFetchResult savepointInfo)
{
+ var resource = ctx.getResource();
+ var resourceName = resource.getMetadata().getName();
+
+ if (savepointInfo.isPending()) {
+ LOG.debug(
+ "Savepoint '{}' with ID {} is pending",
+ resourceName,
+ resource.getStatus().getTriggerId());
+ } else if (savepointInfo.getError() != null) {
+ throw new ReconciliationException(savepointInfo.getError());
+ } else {
+ LOG.info("Savepoint {} successful: {}", resourceName,
savepointInfo.getLocation());
+ FlinkStateSnapshotUtils.snapshotSuccessful(
+ resource, savepointInfo.getLocation(), false);
+ }
+ }
+
+ private void handleCheckpoint(
+ FlinkStateSnapshotContext ctx,
+ CheckpointFetchResult checkpointInfo,
+ FlinkResourceContext<FlinkDeployment> ctxFlinkDeployment,
+ String jobId) {
+ var resource = ctx.getResource();
+ var resourceName = resource.getMetadata().getName();
+
+ if (checkpointInfo.isPending()) {
+ LOG.debug(
+ "Checkpoint for {} with ID {} is pending",
+ resourceName,
+ resource.getStatus().getTriggerId());
+ return;
+ }
+
+ if (checkpointInfo.getError() != null) {
+ throw new ReconciliationException(checkpointInfo.getError());
+ } else {
Review Comment:
nit: `else` can be omitted, so this fairly big block can spare 1 indentation
level.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java:
##########
@@ -53,6 +62,16 @@ private static String format(@NonNull CommonStatus<?>
status) {
: status.getError());
}
+ private static String format(@NonNull FlinkStateSnapshotStatus status) {
+ if (StringUtils.isEmpty(status.getError())) {
+ return String.format(
+ ">>> Status[Snapshot] | Info | %s | %s",
status.getState(), status.getPath());
Review Comment:
I thin we should consider line field length consistency, just as the already
existing `format` methods.
Also now that we have different kind of `Status` logs, maybe identify the
job status changes as well would make sense. Calling it `Status[Job]` seems
appropriate, but we should make sure we define a correct `Status[%-8s]`
template or something like that.
And as far as I can see the `Event` logs are also aiming the same field
length compared to status, so probably we should try to keep a completely
consistent format.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkStateSnapshotMetrics.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** FlinkSessionJob metrics. */
Review Comment:
`FlinkSessionJob` -> `FlinkStateSnapshot`. Same for the field level variable
and the `onUpdate`/`onRemove` param. Copy-paste residue. :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]