becketqin commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r432805261
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -99,29 +187,110 @@ public void close() throws Exception {
@Override
public void handleEventFromOperator(int subtask, OperatorEvent event)
throws Exception {
+ mainThreadExecutor.assertRunningInMainThread();
coordinator.handleEventFromOperator(subtask, event);
}
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+ mainThreadExecutor.assertRunningInMainThread();
coordinator.subtaskFailed(subtask, reason);
+ eventValve.resetForTask(subtask);
}
@Override
- public CompletableFuture<byte[]> checkpointCoordinator(long
checkpointId) throws Exception {
- return coordinator.checkpointCoordinator(checkpointId);
+ public void checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> result) {
+ // unfortunately, this method does not run in the scheduler
executor, but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator
runs fully in the scheduler's
+ // main thread executor
+ mainThreadExecutor.execute(() ->
checkpointCoordinatorInternal(checkpointId, result));
}
@Override
public void checkpointComplete(long checkpointId) {
- coordinator.checkpointComplete(checkpointId);
+ // unfortunately, this method does not run in the scheduler
executor, but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator
runs fully in the scheduler's
+ // main thread executor
+ mainThreadExecutor.execute(() ->
checkpointCompleteInternal(checkpointId));
}
@Override
public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+ // ideally we would like to check this here, however this
method is called early during
+ // execution graph construction, before the main thread
executor is set
+
+ eventValve.reset();
Review comment:
It seems we have a minor race conditions here. It is possible that the
coordinator sends some events between line 224 and line 225, i.e. after the
eventValve is reset and before the coordinator has been reset to the checkpoint.
I think those events won't be actually sent to the tasks because the
operator coordinators are only restored in a global recovery, and the tasks
shouldn't have started yet. If so, there is no real impact here except that
those events will receive a different exception from the events failed by
`eventValve.reset()`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration Test case that validates the exactly-once mechanism for
coordinator events
+ * around checkpoints.
+ *
+ * <p>While this test checks a common runtime feature that could be
theoretically tested
Review comment:
Incomplete java doc.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##########
@@ -77,21 +77,66 @@
//
------------------------------------------------------------------------
- CompletableFuture<byte[]> checkpointCoordinator(long checkpointId)
throws Exception;
+ /**
+ * Takes a checkpoint or the coordinator. The checkpoint is identified
by the given ID.
+ *
+ * <p>To confirm the checkpoint and store state in it, the given {@code
CompletableFuture}
+ * must be completed with the state. To abort or dis-confirm the
checkpoint, the given
+ * {@code CompletableFuture} must be completed exceptionally.
+ * In any case, the given {@code CompletableFuture} must be completed
in some way, otherwise the
+ * checkpoint will not progress.
+ *
+ * <h3>Exactly-once Semantics</h3>
+ *
+ * <p>The semantics are defined as follows:
+ * <ul>
+ * <li>The point in time when the checkpoint future is completed is
considered the point in time
+ * when the coordinator's checkpoint takes place.
+ * <li>The OperatorCoordinator implementation must have a way of
strictly ordering the sending
+ * of events and the completion of the checkpoint future (for
example the same thread does
+ * both actions, or both actions are guarded by a mutex).
+ * <li>Every event sent before the checkpoint future is completed is
considered before the checkpoint.
+ * <li>Every event sent after the checkpoint future is completed is
considered to be after the checkpoint.
+ * </ul>
+ */
+ void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]>
resultFuture) throws Exception;
/**
* Notifies the coordinator that the checkpoint with the given
checkpointId completes and
* was committed.
*
- * <p><b>Important:</b> This method is not supposed to throw an
exception, because by the
- * time we notify that the checkpoint is complete, the checkpoint is
committed and cannot be
- * aborted any more. If the coordinator gets into an inconsistent state
internally, it should
- * fail the job ({@link Context#failJob(Throwable)}) instead. Any
exception propagating from
- * this method may be treated as a fatal error for the JobManager,
crashing the JobManager,
- * and leading to an expensive "master failover" procedure.
+ * <h3>Checkpoint Subsuming</h3>
+ *
+ * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher
ID always subsumes
+ * a checkpoint with lower ID. For example, when checkpoint T is
confirmed complete, the
+ * code should treat all checkpoints with lower ID (T-1, T-2, etc.)
also as confirmed.
+ *
+ * <h3>Exceptions</h3>
+ *
+ * <p>This method is not supposed to throw an exception indicating the
the checkpoint cannot
+ * be completed. By the time we notify that the checkpoint is complete,
the checkpoint is
+ * committed and cannot be aborted any more.
+ *
+ * <p>If the coordinator gets into an inconsistent state internally, as
a result of logic that
+ * runs after this notification, it should fail the job ({@link
Context#failJob(Throwable)})
+ * instead. Any exception propagating from this method may be treated
as a fatal error for the
+ * JobManager, crashing the JobManager, and leading to an expensive
"master failover" procedure.
*/
void checkpointComplete(long checkpointId);
+ /**
+ * Resets the coordinator to the given checkpoint.
+ * When this method is called, the coordinator can discard all other
in-flight working state.
+ * All subtasks will also have been reset to the same checkpoint.
+ *
+ * <p>Restoring to a checkpoint is a way of confirming that the
checkpoint is complete.
+ * It is safe to commit side-effects that are predicated on checkpoint
completion after this
+ * call.
+ *
+ * <p>Even if no call to {@link #checkpointComplete(long)} happened,
the checkpoint can still be
+ * complete (for example when a system failure happened directly after
committing the checkpoint,
+ * before calling the {@link #checkpointComplete(long)} method).
+ */
void resetToCheckpoint(byte[] checkpointData) throws Exception;
Review comment:
It might worth mentioning that this method is expected to be
synchronous. i.e. when this method returns, the coordinator state should have
already be reset to the given checkpoint.
Currently the `SourceCoordinator` assumes this method is only called before
`start()` is invoked. I have created
[FLINK-18039](https://issues.apache.org/jira/browse/FLINK-18039) to fix this.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration Test case that validates the exactly-once mechanism for
coordinator events
+ * around checkpoints.
+ *
+ * <p>While this test checks a common runtime feature that could be
theoretically tested
+ * without
+ *
+ * <p>The mechanism of this test is as follows:
+ */
+@SuppressWarnings("serial")
+public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
Review comment:
Good test case!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]