rkhachatryan commented on a change in pull request #14662:
URL: https://github.com/apache/flink/pull/14662#discussion_r560372765



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
##########
@@ -34,22 +34,18 @@
 
     private static final long serialVersionUID = 2094094662279578953L;
 
-    /** The reason why the checkpoint was declined. */
-    @Nullable private final SerializedThrowable reason;
-
-    public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId) {
-        this(job, taskExecutionId, checkpointId, null);
-    }
+    /** The serialized reason why the checkpoint was declined. */
+    @Nonnull private final SerializedCheckpointException 
serializedCheckpointException;
 
     public DeclineCheckpoint(
             JobID job,
             ExecutionAttemptID taskExecutionId,
             long checkpointId,
-            @Nullable Throwable reason) {
+            @Nonnull CheckpointException checkpointException) {
         super(job, taskExecutionId, checkpointId);
 
-        // some other exception. replace with a serialized throwable, to be on 
the safe side
-        this.reason = reason == null ? null : new SerializedThrowable(reason);
+        // replace with a serialized throwable, to be on the safe side
+        this.serializedCheckpointException = new 
SerializedCheckpointException(checkpointException);

Review comment:
       I see that `checkpointException` is dereferenced further but can we add 
an explicit non-null check for extra safety?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/SerializedCheckpointException.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.util.SerializedThrowable;
+
+import java.io.Serializable;
+
+/**
+ * Serialized checkpoint exception which wraps the checkpoint failure reason 
and its serialized
+ * throwable.
+ */
+public class SerializedCheckpointException implements Serializable {

Review comment:
       Could you explain why just `SerializedThrowable` built from 
`CheckpointException` is not enough?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -237,19 +241,29 @@ private void handleExecutionException(Exception e) {
                                         + '.',
                                 e);
 
-                // We only report the exception for the original cause of fail 
and cleanup.
-                // Otherwise this followup exception could race the original 
exception in failing
-                // the task.
-                try {
-                    taskEnvironment.declineCheckpoint(
+                if (isTaskRunning.get()) {
+                    // We only report the exception for the original cause of 
fail and cleanup.
+                    // Otherwise this followup exception could race the 
original exception in
+                    // failing the task.
+                    try {
+                        taskEnvironment.declineCheckpoint(
+                                checkpointMetaData.getCheckpointId(),
+                                new CheckpointException(
+                                        
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
+                                        checkpointException));
+                    } catch (Exception unhandled) {
+                        AsynchronousException asyncException = new 
AsynchronousException(unhandled);
+                        asyncExceptionHandler.handleAsyncException(
+                                "Failure in asynchronous checkpoint 
materialization",
+                                asyncException);
+                    }
+                } else {
+                    // We never decline checkpoint after task is not running 
to avoid unexpected job
+                    // failover, which caused by exceeding checkpoint 
tolerable failure threshold.
+                    LOG.warn(

Review comment:
       Could you explain the connection of this change to the original 
motivation?
   
   What will happen to such checkpoint? WIll it timeout?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
##########
@@ -34,22 +34,18 @@
 
     private static final long serialVersionUID = 2094094662279578953L;
 
-    /** The reason why the checkpoint was declined. */
-    @Nullable private final SerializedThrowable reason;
-
-    public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId) {
-        this(job, taskExecutionId, checkpointId, null);
-    }
+    /** The serialized reason why the checkpoint was declined. */
+    @Nonnull private final SerializedCheckpointException 
serializedCheckpointException;

Review comment:
       nit: I think we can assume not null by defult

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+
+/** Tests to verify end-to-end logic of checkpoint failure manager. */
+public class CheckpointFailureManagerITCase extends TestLogger {
+    private static MiniClusterWithClientResource cluster;
+
+    @Before
+    public void setup() throws Exception {
+        Configuration configuration = new Configuration();
+
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .build());
+        cluster.before();
+    }
+
+    @AfterClass
+    public static void shutDownExistingCluster() {
+        if (cluster != null) {
+            cluster.after();
+            cluster = null;
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testAsyncCheckpointFailureTriggerJobFailed() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setStateBackend(new AsyncFailureStateBackend());
+        env.addSource(new StringGeneratingSourceFunction()).addSink(new 
DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            // assert that the job only execute checkpoint once and only 
failed once.
+            TestUtils.submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
+        } catch (JobExecutionException jobException) {
+            if (!jobException
+                    .getCause()
+                    .getCause()
+                    .equals(

Review comment:
       I think it would be more robust to use `ExceptionUtils.findThrowable` , 
rethrow it and then check exception message or type.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
##########
@@ -69,6 +66,6 @@ public SerializedThrowable getReason() {
     public String toString() {
         return String.format(
                 "Declined Checkpoint %d for (%s/%s): %s",
-                getCheckpointId(), getJob(), getTaskExecutionId(), reason);
+                getCheckpointId(), getJob(), getTaskExecutionId(), 
serializedCheckpointException);

Review comment:
       `SerializedCheckpointException` doesn't override `toString`.
   Maybe `serializedCheckpointException.getCheckpointFailureReason()`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/SerializedCheckpointException.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.util.SerializedThrowable;
+
+import java.io.Serializable;
+
+/**
+ * Serialized checkpoint exception which wraps the checkpoint failure reason 
and its serialized
+ * throwable.
+ */
+public class SerializedCheckpointException implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final CheckpointFailureReason checkpointFailureReason;
+    private final SerializedThrowable serializedThrowable;

Review comment:
       I couldn't find usages of this field. It's unclear to me who is 
responsible for calling `deserializeError`. IIUC, it's not usable before this 
call.
   Maybe just drop the field?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to