[FLINK-5763] [checkpoints] Acknowledge with explicit ID and CheckpointMetrics

Instead of acknowledging checkpoints with the CheckpointMetaData make
the acknowledgement explicit by ID and CheckpointMetrics. The rest is
not needed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2edc9718
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2edc9718
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2edc9718

Branch: refs/heads/master
Commit: 2edc97185700a5bdb3e181a71493d681c0f693e3
Parents: 2d2ffba
Author: Ufuk Celebi <[email protected]>
Authored: Wed Feb 15 17:52:40 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 22 12:14:55 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |   6 +-
 .../checkpoint/CheckpointCoordinator.java       |   2 +-
 .../CheckpointCoordinatorGateway.java           |   3 +-
 .../runtime/checkpoint/CheckpointMetrics.java   |   2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  27 ++---
 .../flink/runtime/execution/Environment.java    |  17 +--
 .../flink/runtime/jobmaster/JobMaster.java      |   7 +-
 .../checkpoint/AcknowledgeCheckpoint.java       |  35 +++---
 .../rpc/RpcCheckpointResponder.java             |   9 +-
 .../ActorGatewayCheckpointResponder.java        |   7 +-
 .../taskmanager/CheckpointResponder.java        |  12 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  13 +-
 .../CheckpointCoordinatorFailureTest.java       |   3 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 119 ++++++++++---------
 .../checkpoint/CheckpointStateRestoreTest.java  |  11 +-
 .../checkpoint/PendingCheckpointTest.java       |   9 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../messages/CheckpointMessagesTest.java        |   7 +-
 .../operators/testutils/DummyEnvironment.java   |   6 +-
 .../operators/testutils/MockEnvironment.java    |   6 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   4 +-
 .../streaming/runtime/tasks/StreamTask.java     |   5 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   6 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   6 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   6 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   7 +-
 26 files changed, 174 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 6587291..bce8028 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -151,10 +152,11 @@ public class RocksDBAsyncSnapshotTest {
 
                        @Override
                        public void acknowledgeCheckpoint(
-                                       CheckpointMetaData checkpointMetaData,
+                                       long checkpointId,
+                                       CheckpointMetrics checkpointMetrics,
                                        SubtaskState checkpointStateHandles) {
 
-                               super.acknowledgeCheckpoint(checkpointMetaData);
+                               super.acknowledgeCheckpoint(checkpointId, 
checkpointMetrics);
 
                                // block on the latch, to verify that 
triggerCheckpoint returns below,
                                // even though the async checkpoint would not 
finish

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6cac006..36649ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -648,7 +648,7 @@ public class CheckpointCoordinator {
 
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
 
-                               switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), 
message.getSubtaskState(), message.getCheckpointMetaData())) {
+                               switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), 
message.getSubtaskState(), message.getCheckpointMetrics())) {
                                        case SUCCESS:
                                                LOG.debug("Received acknowledge 
message for checkpoint {} from task {} of job {}.",
                                                        checkpointId, 
message.getTaskExecutionId(), message.getJob());

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 8d1423a..43d66ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -27,7 +27,8 @@ public interface CheckpointCoordinatorGateway extends 
RpcGateway {
        void acknowledgeCheckpoint(
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
-                       final CheckpointMetaData checkpointMetaData,
+                       final long checkpointId,
+                       final CheckpointMetrics checkpointMetrics,
                        final SubtaskState subtaskState);
 
        void declineCheckpoint(

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
index f72b00e..be73adb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -129,4 +129,4 @@ public class CheckpointMetrics implements Serializable {
                                ", asyncDurationMillis=" + asyncDurationMillis +
                                '}';
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1531f0f..9f66314 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -18,6 +18,16 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -35,17 +45,6 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
@@ -250,13 +249,13 @@ public class PendingCheckpoint {
         *
         * @param executionAttemptId of the acknowledged task
         * @param subtaskState of the acknowledged task
-        * @param checkpointMetaData Checkpoint meta data
+        * @param metrics Checkpoint metrics for the stats
         * @return TaskAcknowledgeResult of the operation
         */
        public TaskAcknowledgeResult acknowledgeTask(
                        ExecutionAttemptID executionAttemptId,
                        SubtaskState subtaskState,
-                       CheckpointMetaData checkpointMetaData) {
+                       CheckpointMetrics metrics) {
 
                synchronized (lock) {
                        if (discarded) {
@@ -314,8 +313,6 @@ public class PendingCheckpoint {
                        ++numAcknowledgedTasks;
 
                        if (statsCallback != null) {
-                               CheckpointMetrics metrics = 
checkpointMetaData.getMetrics();
-
                                // Do this in millis because the web frontend 
works with them
                                long alignmentDurationMillis = 
metrics.getAlignmentDurationNanos() / 1_000_000;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 1675365..9e9f7c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.execution;
 
+import java.util.Map;
+import java.util.concurrent.Future;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
@@ -25,7 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -39,9 +41,6 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
-import java.util.Map;
-import java.util.concurrent.Future;
-
 /**
  * The Environment gives the code executed in a task access to the task's 
properties
  * (such as name, parallelism), the configurations, the data stream readers 
and writers,
@@ -162,19 +161,21 @@ public interface Environment {
         * to for the checkpoint with the give checkpoint-ID. This method does 
not include
         * any state in the checkpoint.
         * 
-        * @param checkpointMetaData the meta data for this checkpoint
+        * @param checkpointId ID of this checkpoint
+        * @param checkpointMetrics metrics for this checkpoint
         */
-       void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData);
+       void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics);
 
        /**
         * Confirms that the invokable has successfully completed all required 
steps for
         * the checkpoint with the give checkpoint-ID. This method does include
         * the given state in the checkpoint.
         *
-        * @param checkpointMetaData the meta data for this checkpoint
+        * @param checkpointId ID of this checkpoint
+        * @param checkpointMetrics metrics for this checkpoint
         * @param subtaskState All state handles for the checkpointed state
         */
-       void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, 
SubtaskState subtaskState);
+       void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics, SubtaskState subtaskState);
 
        /**
         * Declines a checkpoint. This tells the checkpoint coordinator that 
this task will

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a318657..941248f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -30,7 +30,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -519,12 +519,13 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        public void acknowledgeCheckpoint(
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
-                       final CheckpointMetaData checkpointInfo,
+                       final long checkpointId,
+                       final CheckpointMetrics checkpointMetrics,
                        final SubtaskState checkpointState) throws 
CheckpointException {
 
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
                final AcknowledgeCheckpoint ackMessage = 
-                               new AcknowledgeCheckpoint(jobID, 
executionAttemptID, checkpointInfo, checkpointState);
+                               new AcknowledgeCheckpoint(jobID, 
executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
 
                if (checkpointCoordinator != null) {
                        getRpcService().execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 7ec3efa..9721c2c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.messages.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * This message is sent from the {@link 
org.apache.flink.runtime.taskmanager.TaskManager} to the
  * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the 
checkpoint of an
@@ -39,32 +38,26 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
 
        private final SubtaskState subtaskState;
 
-       private final CheckpointMetaData checkpointMetaData;
+       private final CheckpointMetrics checkpointMetrics;
 
        // 
------------------------------------------------------------------------
 
        public AcknowledgeCheckpoint(
                        JobID job,
                        ExecutionAttemptID taskExecutionId,
-                       CheckpointMetaData checkpointMetaData) {
-               this(job, taskExecutionId, checkpointMetaData, null);
-       }
-
-       public AcknowledgeCheckpoint(
-                       JobID job,
-                       ExecutionAttemptID taskExecutionId,
-                       CheckpointMetaData checkpointMetaData,
+                       long checkpointId,
+                       CheckpointMetrics checkpointMetrics,
                        SubtaskState subtaskState) {
 
-               super(job, taskExecutionId, 
checkpointMetaData.getCheckpointId());
+               super(job, taskExecutionId, checkpointId);
 
                this.subtaskState = subtaskState;
-               this.checkpointMetaData = checkpointMetaData;
-               // these may be "-1", in case the values are unknown or not set
-               checkArgument(checkpointMetaData.getSyncDurationMillis() >= -1);
-               checkArgument(checkpointMetaData.getAsyncDurationMillis() >= 
-1);
-               checkArgument(checkpointMetaData.getBytesBufferedInAlignment() 
>= -1);
-               checkArgument(checkpointMetaData.getAlignmentDurationNanos() >= 
-1);
+               this.checkpointMetrics = checkpointMetrics;
+       }
+
+       @VisibleForTesting
+       public AcknowledgeCheckpoint(JobID jobId, ExecutionAttemptID 
taskExecutionId, long checkpointId) {
+               this(jobId, taskExecutionId, checkpointId, new 
CheckpointMetrics(), null);
        }
 
        // 
------------------------------------------------------------------------
@@ -75,8 +68,8 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
                return subtaskState;
        }
 
-       public CheckpointMetaData getCheckpointMetaData() {
-               return checkpointMetaData;
+       public CheckpointMetrics getCheckpointMetrics() {
+               return checkpointMetrics;
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index 1ce4350..bf60161 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -38,15 +38,16 @@ public class RpcCheckpointResponder implements 
CheckpointResponder {
        public void acknowledgeCheckpoint(
                        JobID jobID,
                        ExecutionAttemptID executionAttemptID,
-                       CheckpointMetaData checkpointMetaData,
+                       long checkpointId,
+                       CheckpointMetrics checkpointMetrics,
                        SubtaskState subtaskState) {
 
                checkpointCoordinatorGateway.acknowledgeCheckpoint(
                        jobID,
                        executionAttemptID,
-                       checkpointMetaData,
+                       checkpointId,
+                       checkpointMetrics,
                        subtaskState);
-
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index dafcefe..ad0df71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -42,11 +42,12 @@ public class ActorGatewayCheckpointResponder implements 
CheckpointResponder {
        public void acknowledgeCheckpoint(
                        JobID jobID,
                        ExecutionAttemptID executionAttemptID,
-                       CheckpointMetaData checkpointMetaData,
+                       long checkpointId,
+                       CheckpointMetrics checkpointMetrics,
                        SubtaskState checkpointStateHandles) {
 
                AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
-                               jobID, executionAttemptID, checkpointMetaData,
+                               jobID, executionAttemptID, checkpointId, 
checkpointMetrics,
                                checkpointStateHandles);
 
                actorGateway.tell(message);

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index cdf87d3..cc66a3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
@@ -35,16 +35,18 @@ public interface CheckpointResponder {
         *             Job ID of the running job
         * @param executionAttemptID
         *             Execution attempt ID of the running task
+        * @param checkpointId
+        *             Meta data for this checkpoint
+        * @param checkpointMetrics
+        *             Metrics of this checkpoint
         * @param subtaskState
         *             State handles for the checkpoint
-        * @param checkpointMetaData
-        *             Meta data for this checkpoint
-        *
         */
        void acknowledgeCheckpoint(
                JobID jobID,
                ExecutionAttemptID executionAttemptID,
-               CheckpointMetaData checkpointMetaData,
+               long checkpointId,
+               CheckpointMetrics checkpointMetrics,
                SubtaskState subtaskState);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 7fe94a6..788a590 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -237,19 +237,18 @@ public class RuntimeEnvironment implements Environment {
        }
 
        @Override
-       public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData) {
-
-               acknowledgeCheckpoint(checkpointMetaData, null);
+       public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics) {
+               acknowledgeCheckpoint(checkpointId, checkpointMetrics, null);
        }
 
        @Override
        public void acknowledgeCheckpoint(
-                       CheckpointMetaData checkpointMetaData,
+                       long checkpointId,
+                       CheckpointMetrics checkpointMetrics,
                        SubtaskState checkpointStateHandles) {
 
-
                checkpointResponder.acknowledgeCheckpoint(
-                               jobId, executionId, checkpointMetaData,
+                               jobId, executionId, checkpointId, 
checkpointMetrics,
                                checkpointStateHandles);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d3a440a..d4c3a2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -84,8 +84,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
 
                final long checkpointId = 
coord.getPendingCheckpoints().keySet().iterator().next();
 
-               final CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, triggerTimestamp);
-               AcknowledgeCheckpoint acknowledgeMessage = new 
AcknowledgeCheckpoint(jid, executionAttemptId, checkpointMetaData);
+               AcknowledgeCheckpoint acknowledgeMessage = new 
AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId);
 
                CompletedCheckpoint completedCheckpoint = 
mock(CompletedCheckpoint.class);
                
PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6ba557b..c2ada3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -319,14 +319,14 @@ public class CheckpointCoordinatorTest {
                        CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
                        // acknowledge from one of the tasks
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
                        assertEquals(1, 
checkpoint.getNumberOfAcknowledgedTasks());
                        assertEquals(1, 
checkpoint.getNumberOfNonAcknowledgedTasks());
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
 
                        // acknowledge the same task again (should not matter)
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
 
@@ -533,22 +533,20 @@ public class CheckpointCoordinatorTest {
                                verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
                        }
 
-                       CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
-
                        // acknowledge from one of the tasks
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
                        assertEquals(1, 
checkpoint.getNumberOfAcknowledgedTasks());
                        assertEquals(1, 
checkpoint.getNumberOfNonAcknowledgedTasks());
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
 
                        // acknowledge the same task again (should not matter)
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
 
                        // acknowledge the other task.
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
 
                        // the checkpoint is internally converted to a 
successful checkpoint and the
                        // pending checkpoint object is disposed
@@ -577,9 +575,8 @@ public class CheckpointCoordinatorTest {
                        coord.triggerCheckpoint(timestampNew, false);
 
                        long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-                       CheckpointMetaData checkpointMetaDataNew = new 
CheckpointMetaData(checkpointIdNew, 0L);
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataNew));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaDataNew));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -669,7 +666,7 @@ public class CheckpointCoordinatorTest {
                        CheckpointMetaData checkpointMetaData1 = new 
CheckpointMetaData(checkpointId1, 0L);
 
                        // acknowledge one of the three tasks
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
 
                        // start the second checkpoint
                        // trigger the first checkpoint. this should succeed
@@ -695,10 +692,10 @@ public class CheckpointCoordinatorTest {
 
                        // we acknowledge the remaining two tasks from the first
                        // checkpoint and two tasks from the second checkpoint
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData1));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
 
                        // now, the first checkpoint should be confirmed
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -709,7 +706,7 @@ public class CheckpointCoordinatorTest {
                        verify(commitVertex.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointId1), eq(timestamp1));
 
                        // send the last remaining ack for the second checkpoint
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
 
                        // now, the second checkpoint should be confirmed
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -803,7 +800,7 @@ public class CheckpointCoordinatorTest {
                        CheckpointMetaData checkpointMetaData1 = new 
CheckpointMetaData(checkpointId1, 0L);
 
                        // acknowledge one of the three tasks
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData1));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
 
                        // start the second checkpoint
                        // trigger the first checkpoint. this should succeed
@@ -829,10 +826,10 @@ public class CheckpointCoordinatorTest {
                        // checkpoint completely. The second checkpoint should 
then subsume the first checkpoint
                        CheckpointMetaData checkpointMetaData2= new 
CheckpointMetaData(checkpointId2, 0L);
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointMetaData2));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData2));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointMetaData1));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointMetaData2));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
 
                        // now, the second checkpoint should be confirmed, and 
the first discarded
                        // actually both pending checkpoints are discarded, and 
the second has been transformed
@@ -855,7 +852,7 @@ public class CheckpointCoordinatorTest {
                        verify(commitVertex.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointId2), eq(timestamp2));
 
                        // send the last remaining ack for the first 
checkpoint. This should not do anything
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 
0L)));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
 
                        coord.shutdown(JobStatus.FINISHED);
                }
@@ -912,7 +909,7 @@ public class CheckpointCoordinatorTest {
                        PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().values().iterator().next();
                        assertFalse(checkpoint.isDiscarded());
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, new 
CheckpointMetaData(checkpoint.getCheckpointId(), 0L)));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
 
                        // wait until the checkpoint must have expired.
                        // we check every 250 msecs conservatively for 5 seconds
@@ -984,13 +981,13 @@ public class CheckpointCoordinatorTest {
                        CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
                        // wrong job id
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
 
                        // unknown checkpoint
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, new CheckpointMetaData(1L, 0L)));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
 
                        // unknown ack vertex
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
 
                        coord.shutdown(JobStatus.FINISHED);
                }
@@ -1051,12 +1048,12 @@ public class CheckpointCoordinatorTest {
                SubtaskState triggerSubtaskState = mock(SubtaskState.class);
 
                // acknowledge the first trigger vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, 
triggerSubtaskState));
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new 
CheckpointMetrics(), triggerSubtaskState));
 
                SubtaskState unknownSubtaskState = mock(SubtaskState.class);
 
                // receive an acknowledge message for an unknown vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, 
unknownSubtaskState));
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new 
CheckpointMetrics(), unknownSubtaskState));
 
                // we should discard acknowledge messages from an unknown 
vertex belonging to our job
                verify(unknownSubtaskState, times(1)).discardState();
@@ -1064,13 +1061,13 @@ public class CheckpointCoordinatorTest {
                SubtaskState differentJobSubtaskState = 
mock(SubtaskState.class);
 
                // receive an acknowledge message from an unknown job
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointMetaData, 
differentJobSubtaskState));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), 
differentJobSubtaskState));
 
                // we should not interfere with different jobs
                verify(differentJobSubtaskState, never()).discardState();
 
                // duplicate acknowledge message for the trigger vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, 
triggerSubtaskState));
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new 
CheckpointMetrics(), triggerSubtaskState));
 
                // duplicate acknowledge messages for a known vertex should not 
trigger discarding the state
                verify(triggerSubtaskState, never()).discardState();
@@ -1086,13 +1083,13 @@ public class CheckpointCoordinatorTest {
                SubtaskState ackSubtaskState = mock(SubtaskState.class);
 
                // late acknowledge message from the second ack vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointMetaData, 
ackSubtaskState));
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new 
CheckpointMetrics(), ackSubtaskState));
 
                // check that we also cleaned up this state
                verify(ackSubtaskState, times(1)).discardState();
 
                // receive an acknowledge message from an unknown job
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointMetaData, 
differentJobSubtaskState));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), 
differentJobSubtaskState));
 
                // we should not interfere with different jobs
                verify(differentJobSubtaskState, never()).discardState();
@@ -1100,7 +1097,7 @@ public class CheckpointCoordinatorTest {
                SubtaskState unknownSubtaskState2 = mock(SubtaskState.class);
 
                // receive an acknowledge message for an unknown vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, 
unknownSubtaskState2));
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new 
CheckpointMetrics(), unknownSubtaskState2));
 
                // we should discard acknowledge messages from an unknown 
vertex belonging to our job
                verify(unknownSubtaskState2, times(1)).discardState();
@@ -1261,8 +1258,7 @@ public class CheckpointCoordinatorTest {
                        Long firstCallId = triggerCalls.take();
                        assertEquals(1L, firstCallId.longValue());
 
-                       AcknowledgeCheckpoint ackMsg = new 
AcknowledgeCheckpoint(
-                                       jid, attemptID, new 
CheckpointMetaData(1L, System.currentTimeMillis()));
+                       AcknowledgeCheckpoint ackMsg = new 
AcknowledgeCheckpoint(jid, attemptID, 1L);
 
                        // tell the coordinator that the checkpoint is done
                        final long ackTime = System.nanoTime();
@@ -1357,7 +1353,7 @@ public class CheckpointCoordinatorTest {
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
                // acknowledge from one of the tasks
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaData));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId));
                assertEquals(1, pending.getNumberOfAcknowledgedTasks());
                assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
                assertFalse(pending.isDiscarded());
@@ -1365,13 +1361,13 @@ public class CheckpointCoordinatorTest {
                assertFalse(savepointFuture.isDone());
 
                // acknowledge the same task again (should not matter)
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaData));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId));
                assertFalse(pending.isDiscarded());
                assertFalse(pending.isFullyAcknowledged());
                assertFalse(savepointFuture.isDone());
 
                // acknowledge the other task.
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaData));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId));
 
                // the checkpoint is internally converted to a successful 
checkpoint and the
                // pending checkpoint object is disposed
@@ -1403,8 +1399,8 @@ public class CheckpointCoordinatorTest {
 
                long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                CheckpointMetaData checkpointMetaDataNew = new 
CheckpointMetaData(checkpointIdNew, 0L);
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaDataNew));
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaDataNew));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointIdNew));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew));
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1481,8 +1477,8 @@ public class CheckpointCoordinatorTest {
                CheckpointMetaData checkpointMetaData2 = new 
CheckpointMetaData(checkpointId2, 0L);
 
                // 2nd checkpoint should subsume the 1st checkpoint, but not 
the savepoint
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaData2));
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaData2));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId2));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId2));
 
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
                assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1499,8 +1495,8 @@ public class CheckpointCoordinatorTest {
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
                // 2nd savepoint should subsume the last checkpoint, but not 
the 1st savepoint
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaDataS2));
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaDataS2));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId2));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId2));
 
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
                assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1510,8 +1506,8 @@ public class CheckpointCoordinatorTest {
                assertTrue(savepointFuture2.isDone());
 
                // Ack first savepoint
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaDataS1));
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaDataS1));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId1));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId1));
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1585,7 +1581,7 @@ public class CheckpointCoordinatorTest {
                                        .triggerCheckpoint(anyLong(), 
anyLong());
 
                        // now, once we acknowledge one checkpoint, it should 
trigger the next one
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L)));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
 
                        // this should have immediately triggered a new 
checkpoint
                        now = System.currentTimeMillis();
@@ -1660,7 +1656,7 @@ public class CheckpointCoordinatorTest {
                        // now we acknowledge the second checkpoint, which 
should subsume the first checkpoint
                        // and allow two more checkpoints to be triggered
                        // now, once we acknowledge one checkpoint, it should 
trigger the next one
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(2L, 0L)));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
 
                        // after a while, there should be the new checkpoints
                        final long newTimeout = System.currentTimeMillis() + 
60000;
@@ -1792,7 +1788,7 @@ public class CheckpointCoordinatorTest {
                // ACK all savepoints
                long checkpointId = checkpointIDCounter.getLast();
                for (int i = 0; i < numSavepoints; i++, checkpointId--) {
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, attemptID1, new CheckpointMetaData(checkpointId, 
0L)));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
                }
 
                // After ACKs, all should be completed
@@ -1905,7 +1901,8 @@ public class CheckpointCoordinatorTest {
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -1919,7 +1916,8 @@ public class CheckpointCoordinatorTest {
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2008,7 +2006,8 @@ public class CheckpointCoordinatorTest {
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2022,7 +2021,8 @@ public class CheckpointCoordinatorTest {
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2127,7 +2127,8 @@ public class CheckpointCoordinatorTest {
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2144,7 +2145,8 @@ public class CheckpointCoordinatorTest {
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2260,12 +2262,12 @@ public class CheckpointCoordinatorTest {
                        KeyGroupsStateHandle keyedStateBackend = 
generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
                        KeyGroupsStateHandle keyedStateRaw = 
generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), true);
 
-
                        SubtaskState checkpointStateHandles = new 
SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, 
keyedStateRaw);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
@@ -2287,7 +2289,8 @@ public class CheckpointCoordinatorTest {
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
                                        jid,
                                        
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointMetaData,
+                                       checkpointId,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 0e20ebc8..18b07eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -118,12 +118,11 @@ public class CheckpointStateRestoreTest {
                        final long checkpointId = pending.getCheckpointId();
 
                        SubtaskState checkpointStateHandles = new 
SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
-                       CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, 
checkpointStateHandles));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, 
checkpointStateHandles));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointMetaData, 
checkpointStateHandles));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointMetaData));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointMetaData));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), checkpointStateHandles));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new 
CheckpointMetrics(), checkpointStateHandles));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new 
CheckpointMetrics(), checkpointStateHandles));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
 
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 4358526..3a85c4c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -44,7 +44,6 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -95,7 +94,7 @@ public class PendingCheckpointTest {
                CheckpointProperties persisted = new 
CheckpointProperties(false, true, false, false, false, false, false);
 
                PendingCheckpoint pending = createPendingCheckpoint(persisted, 
tmp.getAbsolutePath());
-               pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetaData(pending.getCheckpointId(), 
pending.getCheckpointTimestamp()));
+               pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
                assertEquals(0, tmp.listFiles().length);
                pending.finalizeCheckpoint();
                assertEquals(1, tmp.listFiles().length);
@@ -103,7 +102,7 @@ public class PendingCheckpointTest {
                // Ephemeral checkpoint
                CheckpointProperties ephemeral = new 
CheckpointProperties(false, false, true, true, true, true, true);
                pending = createPendingCheckpoint(ephemeral, null);
-               pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetaData(pending.getCheckpointId(), 
pending.getCheckpointTimestamp()));
+               pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
 
                assertEquals(1, tmp.listFiles().length);
                pending.finalizeCheckpoint();
@@ -148,7 +147,7 @@ public class PendingCheckpointTest {
                future = pending.getCompletionFuture();
 
                assertFalse(future.isDone());
-               pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetaData(pending.getCheckpointId(), 
pending.getCheckpointTimestamp()));
+               pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
                pending.finalizeCheckpoint();
                assertTrue(future.isDone());
 
@@ -231,7 +230,7 @@ public class PendingCheckpointTest {
                        PendingCheckpoint pending = 
createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
                        pending.setStatsCallback(callback);
 
-                       pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetaData(pending.getCheckpointId(), 
pending.getCheckpointTimestamp()));
+                       pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
                        verify(callback, 
times(1)).reportSubtaskStats(any(JobVertexID.class), 
any(SubtaskStateStats.class));
 
                        pending.finalizeCheckpoint();

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 5f2edac..c7c35ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -611,7 +612,8 @@ public class JobManagerHARecoveryTest {
                                        new SubtaskState(chainedStateHandle, 
null, null, null, null);
 
                        getEnvironment().acknowledgeCheckpoint(
-                                       new 
CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L),
+                                       checkpointMetaData.getCheckpointId(),
+                                       new CheckpointMetrics(0L, 0L, 0L, 0L),
                                        checkpointStateHandles);
                        return true;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 9aa35e0..db45231 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -63,7 +63,7 @@ public class CheckpointMessagesTest {
        public void testConfirmTaskCheckpointed() {
                try {
                        AcknowledgeCheckpoint noState = new 
AcknowledgeCheckpoint(
-                                       new JobID(), new ExecutionAttemptID(), 
new CheckpointMetaData(569345L, 0L));
+                                       new JobID(), new ExecutionAttemptID(), 
569345L);
 
                        KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
 
@@ -78,7 +78,8 @@ public class CheckpointMessagesTest {
                        AcknowledgeCheckpoint withState = new 
AcknowledgeCheckpoint(
                                        new JobID(),
                                        new ExecutionAttemptID(),
-                                       new CheckpointMetaData(87658976143L, 
0L),
+                                       87658976143L,
+                                       new CheckpointMetrics(),
                                        checkpointStateHandles);
 
                        testSerializabilityEqualsHashCode(noState);

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 82d8cc1..851fa96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -152,11 +152,11 @@ public class DummyEnvironment implements Environment {
        }
 
        @Override
-       public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData) {
+       public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics) {
        }
 
        @Override
-       public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData, SubtaskState subtaskState) {
+       public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics, SubtaskState subtaskState) {
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index bfede5b..49175c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -310,12 +310,12 @@ public class MockEnvironment implements Environment {
        }
 
        @Override
-       public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData) {
+       public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics) {
                throw new UnsupportedOperationException();
        }
 
        @Override
-       public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData, SubtaskState subtaskState) {
+       public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics, SubtaskState subtaskState) {
                throw new UnsupportedOperationException();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 10f4303..c78a3d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Future;
@@ -235,7 +235,7 @@ public class JvmExitOnFatalErrorTest {
                private static final class NoOpCheckpointResponder implements 
CheckpointResponder {
 
                        @Override
-                       public void acknowledgeCheckpoint(JobID j, 
ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+                       public void acknowledgeCheckpoint(JobID j, 
ExecutionAttemptID e, long i, CheckpointMetrics c, SubtaskState s) {}
 
                        @Override
                        public void declineCheckpoint(JobID j, 
ExecutionAttemptID e, long l, Throwable t) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d734dc9..92fc6e5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -944,7 +944,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                                keyedStateHandleStream);
 
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
 CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
-                                       
owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
+                                       
owner.getEnvironment().acknowledgeCheckpoint(
+                                               
checkpointMetaData.getCheckpointId(),
+                                               checkpointMetaData.getMetrics(),
+                                               subtaskState);
 
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("{} - finished 
asynchronous part of checkpoint {}. Asynchronous duration: {} ms",

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 0255ee6..907f8f1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -607,10 +608,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
                @Override
                public void acknowledgeCheckpoint(
-                               CheckpointMetaData checkpointMetaData,
+                               long checkpointId,
+                               CheckpointMetrics checkpointMetrics,
                                SubtaskState checkpointStateHandles) {
 
-                       this.checkpointId = 
checkpointMetaData.getCheckpointId();
+                       this.checkpointId = checkpointId;
                        this.checkpointStateHandles = checkpointStateHandles;
                        checkpointLatch.trigger();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 4b08c83..69c2c88 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -679,10 +680,11 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                @Override
                public void acknowledgeCheckpoint(
-                               CheckpointMetaData checkpointMetaData,
+                               long checkpointId,
+                               CheckpointMetrics checkpointMetrics,
                                SubtaskState checkpointStateHandles) {
 
-                       this.checkpointId = 
checkpointMetaData.getCheckpointId();
+                       this.checkpointId = checkpointId;
                        this.checkpointStateHandles = checkpointStateHandles;
                        checkpointLatch.trigger();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 58912ab..ff07fa2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
@@ -327,12 +328,11 @@ public class StreamMockEnvironment implements Environment 
{
        }
 
        @Override
-       public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData) {
+       public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics) {
        }
 
        @Override
-       public void acknowledgeCheckpoint(
-               CheckpointMetaData checkpointMetaData, SubtaskState 
subtaskState) {
+       public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics, SubtaskState subtaskState) {
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2edc9718/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 887ea4f..d33d1b6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -447,7 +448,7 @@ public class StreamTaskTest extends TestLogger {
 
                                return null;
                        }
-               
}).when(mockEnvironment).acknowledgeCheckpoint(any(CheckpointMetaData.class), 
any(SubtaskState.class));
+               }).when(mockEnvironment).acknowledgeCheckpoint(anyLong(), 
any(CheckpointMetrics.class), any(SubtaskState.class));
 
                StreamTask<?, AbstractStreamOperator<?>> streamTask = 
mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
@@ -500,7 +501,7 @@ public class StreamTaskTest extends TestLogger {
                ArgumentCaptor<SubtaskState> subtaskStateCaptor = 
ArgumentCaptor.forClass(SubtaskState.class);
 
                // check that the checkpoint has been completed
-               
verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointMetaData), 
subtaskStateCaptor.capture());
+               verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointId), 
any(CheckpointMetrics.class), subtaskStateCaptor.capture());
 
                SubtaskState subtaskState = subtaskStateCaptor.getValue();
 
@@ -628,7 +629,7 @@ public class StreamTaskTest extends TestLogger {
                }
 
                // check that the checkpoint has not been acknowledged
-               verify(mockEnvironment, 
never()).acknowledgeCheckpoint(any(CheckpointMetaData.class), 
any(SubtaskState.class));
+               verify(mockEnvironment, 
never()).acknowledgeCheckpoint(eq(checkpointId), any(CheckpointMetrics.class), 
any(SubtaskState.class));
 
                // check that the state handles have been discarded
                verify(managedKeyedStateHandle).discardState();

Reply via email to