[FLINK-5763] [checkpoints] Acknowledge with explicit ID and CheckpointMetrics
Instead of acknowledging checkpoints with the CheckpointMetaData make the acknowledgement explicit by ID and CheckpointMetrics. The rest is not needed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2edc9718 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2edc9718 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2edc9718 Branch: refs/heads/master Commit: 2edc97185700a5bdb3e181a71493d681c0f693e3 Parents: 2d2ffba Author: Ufuk Celebi <[email protected]> Authored: Wed Feb 15 17:52:40 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 22 12:14:55 2017 +0100 ---------------------------------------------------------------------- .../state/RocksDBAsyncSnapshotTest.java | 6 +- .../checkpoint/CheckpointCoordinator.java | 2 +- .../CheckpointCoordinatorGateway.java | 3 +- .../runtime/checkpoint/CheckpointMetrics.java | 2 +- .../runtime/checkpoint/PendingCheckpoint.java | 27 ++--- .../flink/runtime/execution/Environment.java | 17 +-- .../flink/runtime/jobmaster/JobMaster.java | 7 +- .../checkpoint/AcknowledgeCheckpoint.java | 35 +++--- .../rpc/RpcCheckpointResponder.java | 9 +- .../ActorGatewayCheckpointResponder.java | 7 +- .../taskmanager/CheckpointResponder.java | 12 +- .../runtime/taskmanager/RuntimeEnvironment.java | 13 +- .../CheckpointCoordinatorFailureTest.java | 3 +- .../checkpoint/CheckpointCoordinatorTest.java | 119 ++++++++++--------- .../checkpoint/CheckpointStateRestoreTest.java | 11 +- .../checkpoint/PendingCheckpointTest.java | 9 +- .../jobmanager/JobManagerHARecoveryTest.java | 4 +- .../messages/CheckpointMessagesTest.java | 7 +- .../operators/testutils/DummyEnvironment.java | 6 +- .../operators/testutils/MockEnvironment.java | 6 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 4 +- .../streaming/runtime/tasks/StreamTask.java | 5 +- .../operators/async/AsyncWaitOperatorTest.java | 6 +- .../runtime/tasks/OneInputStreamTaskTest.java | 6 +- .../runtime/tasks/StreamMockEnvironment.java | 6 +- .../streaming/runtime/tasks/StreamTaskTest.java | 7 +- 26 files changed, 174 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 6587291..bce8028 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -151,10 +152,11 @@ public class RocksDBAsyncSnapshotTest { @Override public void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState checkpointStateHandles) { - super.acknowledgeCheckpoint(checkpointMetaData); + super.acknowledgeCheckpoint(checkpointId, checkpointMetrics); // block on the latch, to verify that triggerCheckpoint returns below, // even though the async checkpoint would not finish http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6cac006..36649ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -648,7 +648,7 @@ public class CheckpointCoordinator { if (checkpoint != null && !checkpoint.isDiscarded()) { - switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetaData())) { + switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) { case SUCCESS: LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index 8d1423a..43d66ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -27,7 +27,8 @@ public interface CheckpointCoordinatorGateway extends RpcGateway { void acknowledgeCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, - final CheckpointMetaData checkpointMetaData, + final long checkpointId, + final CheckpointMetrics checkpointMetrics, final SubtaskState subtaskState); void declineCheckpoint( http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java index f72b00e..be73adb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java @@ -129,4 +129,4 @@ public class CheckpointMetrics implements Serializable { ", asyncDurationMillis=" + asyncDurationMillis + '}'; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1531f0f..9f66314 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -18,6 +18,16 @@ package org.apache.flink.runtime.checkpoint; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; @@ -35,17 +45,6 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executor; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * A pending checkpoint is a checkpoint that has been started, but has not been * acknowledged by all tasks that need to acknowledge it. Once all tasks have @@ -250,13 +249,13 @@ public class PendingCheckpoint { * * @param executionAttemptId of the acknowledged task * @param subtaskState of the acknowledged task - * @param checkpointMetaData Checkpoint meta data + * @param metrics Checkpoint metrics for the stats * @return TaskAcknowledgeResult of the operation */ public TaskAcknowledgeResult acknowledgeTask( ExecutionAttemptID executionAttemptId, SubtaskState subtaskState, - CheckpointMetaData checkpointMetaData) { + CheckpointMetrics metrics) { synchronized (lock) { if (discarded) { @@ -314,8 +313,6 @@ public class PendingCheckpoint { ++numAcknowledgedTasks; if (statsCallback != null) { - CheckpointMetrics metrics = checkpointMetaData.getMetrics(); - // Do this in millis because the web frontend works with them long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000; http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 1675365..9e9f7c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.execution; +import java.util.Map; +import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; @@ -25,7 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -39,9 +41,6 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import java.util.Map; -import java.util.concurrent.Future; - /** * The Environment gives the code executed in a task access to the task's properties * (such as name, parallelism), the configurations, the data stream readers and writers, @@ -162,19 +161,21 @@ public interface Environment { * to for the checkpoint with the give checkpoint-ID. This method does not include * any state in the checkpoint. * - * @param checkpointMetaData the meta data for this checkpoint + * @param checkpointId ID of this checkpoint + * @param checkpointMetrics metrics for this checkpoint */ - void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData); + void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics); /** * Confirms that the invokable has successfully completed all required steps for * the checkpoint with the give checkpoint-ID. This method does include * the given state in the checkpoint. * - * @param checkpointMetaData the meta data for this checkpoint + * @param checkpointId ID of this checkpoint + * @param checkpointMetrics metrics for this checkpoint * @param subtaskState All state handles for the checkpointed state */ - void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState); + void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState); /** * Declines a checkpoint. This tells the checkpoint coordinator that this task will http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index a318657..941248f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -30,7 +30,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointException; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.client.JobExecutionException; @@ -519,12 +519,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { public void acknowledgeCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, - final CheckpointMetaData checkpointInfo, + final long checkpointId, + final CheckpointMetrics checkpointMetrics, final SubtaskState checkpointState) throws CheckpointException { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); final AcknowledgeCheckpoint ackMessage = - new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState); + new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState); if (checkpointCoordinator != null) { getRpcService().execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java index 7ec3efa..9721c2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java @@ -18,13 +18,12 @@ package org.apache.flink.runtime.messages.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import static org.apache.flink.util.Preconditions.checkArgument; - /** * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an @@ -39,32 +38,26 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements private final SubtaskState subtaskState; - private final CheckpointMetaData checkpointMetaData; + private final CheckpointMetrics checkpointMetrics; // ------------------------------------------------------------------------ public AcknowledgeCheckpoint( JobID job, ExecutionAttemptID taskExecutionId, - CheckpointMetaData checkpointMetaData) { - this(job, taskExecutionId, checkpointMetaData, null); - } - - public AcknowledgeCheckpoint( - JobID job, - ExecutionAttemptID taskExecutionId, - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) { - super(job, taskExecutionId, checkpointMetaData.getCheckpointId()); + super(job, taskExecutionId, checkpointId); this.subtaskState = subtaskState; - this.checkpointMetaData = checkpointMetaData; - // these may be "-1", in case the values are unknown or not set - checkArgument(checkpointMetaData.getSyncDurationMillis() >= -1); - checkArgument(checkpointMetaData.getAsyncDurationMillis() >= -1); - checkArgument(checkpointMetaData.getBytesBufferedInAlignment() >= -1); - checkArgument(checkpointMetaData.getAlignmentDurationNanos() >= -1); + this.checkpointMetrics = checkpointMetrics; + } + + @VisibleForTesting + public AcknowledgeCheckpoint(JobID jobId, ExecutionAttemptID taskExecutionId, long checkpointId) { + this(jobId, taskExecutionId, checkpointId, new CheckpointMetrics(), null); } // ------------------------------------------------------------------------ @@ -75,8 +68,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements return subtaskState; } - public CheckpointMetaData getCheckpointMetaData() { - return checkpointMetaData; + public CheckpointMetrics getCheckpointMetrics() { + return checkpointMetrics; } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index 1ce4350..bf60161 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskexecutor.rpc; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; @@ -38,15 +38,16 @@ public class RpcCheckpointResponder implements CheckpointResponder { public void acknowledgeCheckpoint( JobID jobID, ExecutionAttemptID executionAttemptID, - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) { checkpointCoordinatorGateway.acknowledgeCheckpoint( jobID, executionAttemptID, - checkpointMetaData, + checkpointId, + checkpointMetrics, subtaskState); - } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java index dafcefe..ad0df71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.instance.ActorGateway; @@ -42,11 +42,12 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder { public void acknowledgeCheckpoint( JobID jobID, ExecutionAttemptID executionAttemptID, - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState checkpointStateHandles) { AcknowledgeCheckpoint message = new AcknowledgeCheckpoint( - jobID, executionAttemptID, checkpointMetaData, + jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointStateHandles); actorGateway.tell(message); http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java index cdf87d3..cc66a3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -35,16 +35,18 @@ public interface CheckpointResponder { * Job ID of the running job * @param executionAttemptID * Execution attempt ID of the running task + * @param checkpointId + * Meta data for this checkpoint + * @param checkpointMetrics + * Metrics of this checkpoint * @param subtaskState * State handles for the checkpoint - * @param checkpointMetaData - * Meta data for this checkpoint - * */ void acknowledgeCheckpoint( JobID jobID, ExecutionAttemptID executionAttemptID, - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState subtaskState); /** http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 7fe94a6..788a590 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -237,19 +237,18 @@ public class RuntimeEnvironment implements Environment { } @Override - public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) { - - acknowledgeCheckpoint(checkpointMetaData, null); + public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) { + acknowledgeCheckpoint(checkpointId, checkpointMetrics, null); } @Override public void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState checkpointStateHandles) { - checkpointResponder.acknowledgeCheckpoint( - jobId, executionId, checkpointMetaData, + jobId, executionId, checkpointId, checkpointMetrics, checkpointStateHandles); } http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index d3a440a..d4c3a2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -84,8 +84,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next(); - final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, triggerTimestamp); - AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointMetaData); + AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId); CompletedCheckpoint completedCheckpoint = mock(CompletedCheckpoint.class); PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint); http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 6ba557b..c2ada3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -319,14 +319,14 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); // acknowledge from one of the tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks()); assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks()); assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); // acknowledge the same task again (should not matter) - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); @@ -533,22 +533,20 @@ public class CheckpointCoordinatorTest { verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); } - CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); - // acknowledge from one of the tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks()); assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks()); assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); // acknowledge the same task again (should not matter) - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); // acknowledge the other task. - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId)); // the checkpoint is internally converted to a successful checkpoint and the // pending checkpoint object is disposed @@ -577,9 +575,8 @@ public class CheckpointCoordinatorTest { coord.triggerCheckpoint(timestampNew, false); long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew)); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -669,7 +666,7 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L); // acknowledge one of the three tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1)); // start the second checkpoint // trigger the first checkpoint. this should succeed @@ -695,10 +692,10 @@ public class CheckpointCoordinatorTest { // we acknowledge the remaining two tasks from the first // checkpoint and two tasks from the second checkpoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData1)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2)); // now, the first checkpoint should be confirmed assertEquals(1, coord.getNumberOfPendingCheckpoints()); @@ -709,7 +706,7 @@ public class CheckpointCoordinatorTest { verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId1), eq(timestamp1)); // send the last remaining ack for the second checkpoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2)); // now, the second checkpoint should be confirmed assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -803,7 +800,7 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L); // acknowledge one of the three tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1)); // start the second checkpoint // trigger the first checkpoint. this should succeed @@ -829,10 +826,10 @@ public class CheckpointCoordinatorTest { // checkpoint completely. The second checkpoint should then subsume the first checkpoint CheckpointMetaData checkpointMetaData2= new CheckpointMetaData(checkpointId2, 0L); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2)); // now, the second checkpoint should be confirmed, and the first discarded // actually both pending checkpoints are discarded, and the second has been transformed @@ -855,7 +852,7 @@ public class CheckpointCoordinatorTest { verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), eq(timestamp2)); // send the last remaining ack for the first checkpoint. This should not do anything - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 0L))); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1)); coord.shutdown(JobStatus.FINISHED); } @@ -912,7 +909,7 @@ public class CheckpointCoordinatorTest { PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next(); assertFalse(checkpoint.isDiscarded()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(checkpoint.getCheckpointId(), 0L))); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId())); // wait until the checkpoint must have expired. // we check every 250 msecs conservatively for 5 seconds @@ -984,13 +981,13 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); // wrong job id - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId)); // unknown checkpoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(1L, 0L))); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L)); // unknown ack vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId)); coord.shutdown(JobStatus.FINISHED); } @@ -1051,12 +1048,12 @@ public class CheckpointCoordinatorTest { SubtaskState triggerSubtaskState = mock(SubtaskState.class); // acknowledge the first trigger vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState)); SubtaskState unknownSubtaskState = mock(SubtaskState.class); // receive an acknowledge message for an unknown vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState)); // we should discard acknowledge messages from an unknown vertex belonging to our job verify(unknownSubtaskState, times(1)).discardState(); @@ -1064,13 +1061,13 @@ public class CheckpointCoordinatorTest { SubtaskState differentJobSubtaskState = mock(SubtaskState.class); // receive an acknowledge message from an unknown job - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState)); // we should not interfere with different jobs verify(differentJobSubtaskState, never()).discardState(); // duplicate acknowledge message for the trigger vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState)); // duplicate acknowledge messages for a known vertex should not trigger discarding the state verify(triggerSubtaskState, never()).discardState(); @@ -1086,13 +1083,13 @@ public class CheckpointCoordinatorTest { SubtaskState ackSubtaskState = mock(SubtaskState.class); // late acknowledge message from the second ack vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointMetaData, ackSubtaskState)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState)); // check that we also cleaned up this state verify(ackSubtaskState, times(1)).discardState(); // receive an acknowledge message from an unknown job - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState)); // we should not interfere with different jobs verify(differentJobSubtaskState, never()).discardState(); @@ -1100,7 +1097,7 @@ public class CheckpointCoordinatorTest { SubtaskState unknownSubtaskState2 = mock(SubtaskState.class); // receive an acknowledge message for an unknown vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2)); // we should discard acknowledge messages from an unknown vertex belonging to our job verify(unknownSubtaskState2, times(1)).discardState(); @@ -1261,8 +1258,7 @@ public class CheckpointCoordinatorTest { Long firstCallId = triggerCalls.take(); assertEquals(1L, firstCallId.longValue()); - AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint( - jid, attemptID, new CheckpointMetaData(1L, System.currentTimeMillis())); + AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(jid, attemptID, 1L); // tell the coordinator that the checkpoint is done final long ackTime = System.nanoTime(); @@ -1357,7 +1353,7 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); // acknowledge from one of the tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); assertEquals(1, pending.getNumberOfAcknowledgedTasks()); assertEquals(1, pending.getNumberOfNonAcknowledgedTasks()); assertFalse(pending.isDiscarded()); @@ -1365,13 +1361,13 @@ public class CheckpointCoordinatorTest { assertFalse(savepointFuture.isDone()); // acknowledge the same task again (should not matter) - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId)); assertFalse(pending.isDiscarded()); assertFalse(pending.isFullyAcknowledged()); assertFalse(savepointFuture.isDone()); // acknowledge the other task. - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId)); // the checkpoint is internally converted to a successful checkpoint and the // pending checkpoint object is disposed @@ -1403,8 +1399,8 @@ public class CheckpointCoordinatorTest { long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew)); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1481,8 +1477,8 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L); // 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData2)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2)); assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1499,8 +1495,8 @@ public class CheckpointCoordinatorTest { assertEquals(3, coord.getNumberOfPendingCheckpoints()); // 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS2)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2)); assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1510,8 +1506,8 @@ public class CheckpointCoordinatorTest { assertTrue(savepointFuture2.isDone()); // Ack first savepoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS1)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataS1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1)); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1585,7 +1581,7 @@ public class CheckpointCoordinatorTest { .triggerCheckpoint(anyLong(), anyLong()); // now, once we acknowledge one checkpoint, it should trigger the next one - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L))); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L)); // this should have immediately triggered a new checkpoint now = System.currentTimeMillis(); @@ -1660,7 +1656,7 @@ public class CheckpointCoordinatorTest { // now we acknowledge the second checkpoint, which should subsume the first checkpoint // and allow two more checkpoints to be triggered // now, once we acknowledge one checkpoint, it should trigger the next one - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(2L, 0L))); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L)); // after a while, there should be the new checkpoints final long newTimeout = System.currentTimeMillis() + 60000; @@ -1792,7 +1788,7 @@ public class CheckpointCoordinatorTest { // ACK all savepoints long checkpointId = checkpointIDCounter.getLast(); for (int i = 0; i < numSavepoints; i++, checkpointId--) { - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, new CheckpointMetaData(checkpointId, 0L))); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId)); } // After ACKs, all should be completed @@ -1905,7 +1901,8 @@ public class CheckpointCoordinatorTest { AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); @@ -1919,7 +1916,8 @@ public class CheckpointCoordinatorTest { AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); @@ -2008,7 +2006,8 @@ public class CheckpointCoordinatorTest { AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); @@ -2022,7 +2021,8 @@ public class CheckpointCoordinatorTest { AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); @@ -2127,7 +2127,8 @@ public class CheckpointCoordinatorTest { AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); @@ -2144,7 +2145,8 @@ public class CheckpointCoordinatorTest { AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); @@ -2260,12 +2262,12 @@ public class CheckpointCoordinatorTest { KeyGroupsStateHandle keyedStateBackend = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false); KeyGroupsStateHandle keyedStateRaw = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), true); - SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); @@ -2287,7 +2289,8 @@ public class CheckpointCoordinatorTest { AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointMetaData, + checkpointId, + new CheckpointMetrics(), checkpointStateHandles); coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 0e20ebc8..18b07eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -118,12 +118,11 @@ public class CheckpointStateRestoreTest { final long checkpointId = pending.getCheckpointId(); SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null); - CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, checkpointStateHandles)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, checkpointStateHandles)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId)); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 4358526..3a85c4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -44,7 +44,6 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -95,7 +94,7 @@ public class PendingCheckpointTest { CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false); PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath()); - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp())); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertEquals(0, tmp.listFiles().length); pending.finalizeCheckpoint(); assertEquals(1, tmp.listFiles().length); @@ -103,7 +102,7 @@ public class PendingCheckpointTest { // Ephemeral checkpoint CheckpointProperties ephemeral = new CheckpointProperties(false, false, true, true, true, true, true); pending = createPendingCheckpoint(ephemeral, null); - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp())); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertEquals(1, tmp.listFiles().length); pending.finalizeCheckpoint(); @@ -148,7 +147,7 @@ public class PendingCheckpointTest { future = pending.getCompletionFuture(); assertFalse(future.isDone()); - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp())); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); pending.finalizeCheckpoint(); assertTrue(future.isDone()); @@ -231,7 +230,7 @@ public class PendingCheckpointTest { PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null); pending.setStatsCallback(callback); - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetaData(pending.getCheckpointId(), pending.getCheckpointTimestamp())); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class)); pending.finalizeCheckpoint(); http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 5f2edac..c7c35ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -611,7 +612,8 @@ public class JobManagerHARecoveryTest { new SubtaskState(chainedStateHandle, null, null, null, null); getEnvironment().acknowledgeCheckpoint( - new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L), + checkpointMetaData.getCheckpointId(), + new CheckpointMetrics(0L, 0L, 0L, 0L), checkpointStateHandles); return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index 9aa35e0..db45231 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -63,7 +63,7 @@ public class CheckpointMessagesTest { public void testConfirmTaskCheckpointed() { try { AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint( - new JobID(), new ExecutionAttemptID(), new CheckpointMetaData(569345L, 0L)); + new JobID(), new ExecutionAttemptID(), 569345L); KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42); @@ -78,7 +78,8 @@ public class CheckpointMessagesTest { AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint( new JobID(), new ExecutionAttemptID(), - new CheckpointMetaData(87658976143L, 0L), + 87658976143L, + new CheckpointMetrics(), checkpointStateHandles); testSerializabilityEqualsHashCode(noState); http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 82d8cc1..851fa96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -152,11 +152,11 @@ public class DummyEnvironment implements Environment { } @Override - public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) { + public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) { } @Override - public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) { + public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index bfede5b..49175c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -310,12 +310,12 @@ public class MockEnvironment implements Environment { } @Override - public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) { + public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) { throw new UnsupportedOperationException(); } @Override - public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) { + public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index 10f4303..c78a3d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -26,7 +26,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.Future; @@ -235,7 +235,7 @@ public class JvmExitOnFatalErrorTest { private static final class NoOpCheckpointResponder implements CheckpointResponder { @Override - public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {} + public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, long i, CheckpointMetrics c, SubtaskState s) {} @Override public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {} http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d734dc9..92fc6e5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -944,7 +944,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> keyedStateHandleStream); if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { - owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState); + owner.getEnvironment().acknowledgeCheckpoint( + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getMetrics(), + subtaskState); if (LOG.isDebugEnabled()) { LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 0255ee6..907f8f1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -607,10 +608,11 @@ public class AsyncWaitOperatorTest extends TestLogger { @Override public void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState checkpointStateHandles) { - this.checkpointId = checkpointMetaData.getCheckpointId(); + this.checkpointId = checkpointId; this.checkpointStateHandles = checkpointStateHandles; checkpointLatch.trigger(); } http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 4b08c83..69c2c88 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -679,10 +680,11 @@ public class OneInputStreamTaskTest extends TestLogger { @Override public void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, + long checkpointId, + CheckpointMetrics checkpointMetrics, SubtaskState checkpointStateHandles) { - this.checkpointId = checkpointMetaData.getCheckpointId(); + this.checkpointId = checkpointId; this.checkpointStateHandles = checkpointStateHandles; checkpointLatch.trigger(); } http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 58912ab..ff07fa2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -28,6 +28,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.execution.Environment; @@ -327,12 +328,11 @@ public class StreamMockEnvironment implements Environment { } @Override - public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) { + public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) { } @Override - public void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) { + public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState) { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 887ea4f..d33d1b6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -447,7 +448,7 @@ public class StreamTaskTest extends TestLogger { return null; } - }).when(mockEnvironment).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class)); + }).when(mockEnvironment).acknowledgeCheckpoint(anyLong(), any(CheckpointMetrics.class), any(SubtaskState.class)); StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -500,7 +501,7 @@ public class StreamTaskTest extends TestLogger { ArgumentCaptor<SubtaskState> subtaskStateCaptor = ArgumentCaptor.forClass(SubtaskState.class); // check that the checkpoint has been completed - verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointMetaData), subtaskStateCaptor.capture()); + verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointId), any(CheckpointMetrics.class), subtaskStateCaptor.capture()); SubtaskState subtaskState = subtaskStateCaptor.getValue(); @@ -628,7 +629,7 @@ public class StreamTaskTest extends TestLogger { } // check that the checkpoint has not been acknowledged - verify(mockEnvironment, never()).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class)); + verify(mockEnvironment, never()).acknowledgeCheckpoint(eq(checkpointId), any(CheckpointMetrics.class), any(SubtaskState.class)); // check that the state handles have been discarded verify(managedKeyedStateHandle).discardState();
