[FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cb8cde4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cb8cde4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cb8cde4

Branch: refs/heads/master
Commit: 1cb8cde48e054395d808f6fe985ae60648e0b6b5
Parents: 2edc971
Author: Ufuk Celebi <[email protected]>
Authored: Wed Feb 15 18:16:44 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 22 12:14:55 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointMetaData.java  | 81 +-------------------
 .../runtime/checkpoint/CheckpointMetrics.java   | 20 ++++-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  6 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  2 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  3 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  5 +-
 .../streaming/runtime/io/BarrierTracker.java    | 10 +--
 .../streaming/runtime/tasks/OperatorChain.java  |  1 -
 .../streaming/runtime/tasks/StreamTask.java     | 49 ++++++------
 .../io/BarrierBufferAlignmentLimitTest.java     |  8 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 35 +++++----
 .../runtime/io/BarrierTrackerTest.java          |  3 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |  3 +-
 13 files changed, 87 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
index 2627b22..9960b44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.util.Preconditions;
-
 import java.io.Serializable;
 
 /**
@@ -35,65 +33,9 @@ public class CheckpointMetaData implements Serializable {
        /** The timestamp of the checkpoint */
        private final long timestamp;
 
-       private final CheckpointMetrics metrics;
-
        public CheckpointMetaData(long checkpointId, long timestamp) {
                this.checkpointId = checkpointId;
                this.timestamp = timestamp;
-               this.metrics = new CheckpointMetrics();
-       }
-
-       public CheckpointMetaData(
-                       long checkpointId,
-                       long timestamp,
-                       long synchronousDurationMillis,
-                       long asynchronousDurationMillis,
-                       long bytesBufferedInAlignment,
-                       long alignmentDurationNanos) {
-               this.checkpointId = checkpointId;
-               this.timestamp = timestamp;
-               this.metrics = new CheckpointMetrics(
-                               bytesBufferedInAlignment,
-                               alignmentDurationNanos,
-                               synchronousDurationMillis,
-                               asynchronousDurationMillis);
-       }
-
-       public CheckpointMetaData(
-                       long checkpointId,
-                       long timestamp,
-                       CheckpointMetrics metrics) {
-               this.checkpointId = checkpointId;
-               this.timestamp = timestamp;
-               this.metrics = Preconditions.checkNotNull(metrics);
-       }
-
-       public CheckpointMetrics getMetrics() {
-               return metrics;
-       }
-
-       public CheckpointMetaData setBytesBufferedInAlignment(long 
bytesBufferedInAlignment) {
-               Preconditions.checkArgument(bytesBufferedInAlignment >= 0);
-               
this.metrics.setBytesBufferedInAlignment(bytesBufferedInAlignment);
-               return this;
-       }
-
-       public CheckpointMetaData setAlignmentDurationNanos(long 
alignmentDurationNanos) {
-               Preconditions.checkArgument(alignmentDurationNanos >= 0);
-               this.metrics.setAlignmentDurationNanos(alignmentDurationNanos);
-               return this;
-       }
-
-       public CheckpointMetaData setSyncDurationMillis(long 
syncDurationMillis) {
-               Preconditions.checkArgument(syncDurationMillis >= 0);
-               this.metrics.setSyncDurationMillis(syncDurationMillis);
-               return this;
-       }
-
-       public CheckpointMetaData setAsyncDurationMillis(long 
asyncDurationMillis) {
-               Preconditions.checkArgument(asyncDurationMillis >= 0);
-               this.metrics.setAsyncDurationMillis(asyncDurationMillis);
-               return this;
        }
 
        public long getCheckpointId() {
@@ -104,22 +46,6 @@ public class CheckpointMetaData implements Serializable {
                return timestamp;
        }
 
-       public long getBytesBufferedInAlignment() {
-               return metrics.getBytesBufferedInAlignment();
-       }
-
-       public long getAlignmentDurationNanos() {
-               return metrics.getAlignmentDurationNanos();
-       }
-
-       public long getSyncDurationMillis() {
-               return metrics.getSyncDurationMillis();
-       }
-
-       public long getAsyncDurationMillis() {
-               return metrics.getAsyncDurationMillis();
-       }
-
        @Override
        public boolean equals(Object o) {
                if (this == o) {
@@ -132,15 +58,13 @@ public class CheckpointMetaData implements Serializable {
                CheckpointMetaData that = (CheckpointMetaData) o;
 
                return (checkpointId == that.checkpointId)
-                               && (timestamp == that.timestamp)
-                               && (metrics.equals(that.metrics));
+                               && (timestamp == that.timestamp);
        }
 
        @Override
        public int hashCode() {
                int result = (int) (checkpointId ^ (checkpointId >>> 32));
                result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-               result = 31 * result + metrics.hashCode();
                return result;
        }
 
@@ -149,7 +73,6 @@ public class CheckpointMetaData implements Serializable {
                return "CheckpointMetaData{" +
                                "checkpointId=" + checkpointId +
                                ", timestamp=" + timestamp +
-                               ", metrics=" + metrics +
                                '}';
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
index be73adb..a90a2e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 import java.io.Serializable;
 
 /**
@@ -49,6 +51,12 @@ public class CheckpointMetrics implements Serializable {
                        long syncDurationMillis,
                        long asyncDurationMillis) {
 
+               // these may be "-1", in case the values are unknown or not set
+               checkArgument(syncDurationMillis >= -1);
+               checkArgument(asyncDurationMillis >= -1);
+               checkArgument(bytesBufferedInAlignment >= -1);
+               checkArgument(alignmentDurationNanos >= -1);
+
                this.bytesBufferedInAlignment = bytesBufferedInAlignment;
                this.alignmentDurationNanos = alignmentDurationNanos;
                this.syncDurationMillis = syncDurationMillis;
@@ -59,32 +67,36 @@ public class CheckpointMetrics implements Serializable {
                return bytesBufferedInAlignment;
        }
 
-       public void setBytesBufferedInAlignment(long bytesBufferedInAlignment) {
+       public CheckpointMetrics setBytesBufferedInAlignment(long 
bytesBufferedInAlignment) {
                this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+               return this;
        }
 
        public long getAlignmentDurationNanos() {
                return alignmentDurationNanos;
        }
 
-       public void setAlignmentDurationNanos(long alignmentDurationNanos) {
+       public CheckpointMetrics setAlignmentDurationNanos(long 
alignmentDurationNanos) {
                this.alignmentDurationNanos = alignmentDurationNanos;
+               return this;
        }
 
        public long getSyncDurationMillis() {
                return syncDurationMillis;
        }
 
-       public void setSyncDurationMillis(long syncDurationMillis) {
+       public CheckpointMetrics setSyncDurationMillis(long syncDurationMillis) 
{
                this.syncDurationMillis = syncDurationMillis;
+               return this;
        }
 
        public long getAsyncDurationMillis() {
                return asyncDurationMillis;
        }
 
-       public void setAsyncDurationMillis(long asyncDurationMillis) {
+       public CheckpointMetrics setAsyncDurationMillis(long 
asyncDurationMillis) {
                this.asyncDurationMillis = asyncDurationMillis;
+               return this;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 39ddc961..87b66ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
 /**
@@ -41,7 +42,7 @@ public interface StatefulTask {
         * 
         * <p>This method is called for tasks that start the checkpoints by 
injecting the initial barriers,
         * i.e., the source tasks. In contrast, checkpoints on downstream 
operators, which are the result of
-        * receiving checkpoint barriers, invoke the {@link 
#triggerCheckpointOnBarrier(CheckpointMetaData)}
+        * receiving checkpoint barriers, invoke the {@link 
#triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)}
         * method.
         *
         * @param checkpointMetaData Meta data for about this checkpoint
@@ -55,10 +56,11 @@ public interface StatefulTask {
         * barriers on all input streams.
         * 
         * @param checkpointMetaData Meta data for about this checkpoint
+        * @param checkpointMetrics Metrics about this checkpoint
         * 
         * @throws Exception Exceptions thrown as the result of triggering a 
checkpoint are forwarded.
         */
-       void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) 
throws Exception;
+       void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, 
CheckpointMetrics checkpointMetrics) throws Exception;
 
        /**
         * Aborts a checkpoint as the result of receiving possibly some 
checkpoint barriers,

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index c7c35ec..de54d1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -619,7 +619,7 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
                        throw new UnsupportedOperationException("should not be 
called!");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 26b8cdb..187163d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -242,7 +243,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
                        throw new UnsupportedOperationException("Should not be 
called");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index e91c26a..611bd44 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
@@ -363,11 +364,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                        long bytesBuffered = currentBuffered != null ? 
currentBuffered.size() : 0L;
 
-                       checkpointMetaData
+                       CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
                                        
.setBytesBufferedInAlignment(bytesBuffered)
                                        
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
-                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, 
checkpointMetrics);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9351f1b..77608c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -250,12 +251,11 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        private void notifyCheckpoint(long checkpointId, long timestamp) throws 
Exception {
                if (toNotifyOnCheckpoint != null) {
                        CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
+                       CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
+                               .setBytesBufferedInAlignment(0L)
+                               .setAlignmentDurationNanos(0L);
 
-                       checkpointMetaData
-                                       .setBytesBufferedInAlignment(0L)
-                                       .setAlignmentDurationNanos(0L);
-
-                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, 
checkpointMetrics);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 7e24eea..591ed3c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -162,7 +162,6 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
        }
 
-
        public void broadcastCheckpointBarrier(long id, long timestamp) throws 
IOException {
                try {
                        CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 92fc6e5..60afd60 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -516,10 +517,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        @Override
        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) 
throws Exception {
                try {
-                       checkpointMetaData.
-                                       setBytesBufferedInAlignment(0L).
-                                       setAlignmentDurationNanos(0L);
-                       return performCheckpoint(checkpointMetaData);
+                       // No alignment if we inject a checkpoint
+                       CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
+                                       .setBytesBufferedInAlignment(0L)
+                                       .setAlignmentDurationNanos(0L);
+
+                       return performCheckpoint(checkpointMetaData, 
checkpointMetrics);
                }
                catch (Exception e) {
                        // propagate exceptions only if the task is still in 
"running" state
@@ -535,9 +538,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        @Override
-       public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
+       public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
                try {
-                       performCheckpoint(checkpointMetaData);
+                       performCheckpoint(checkpointMetaData, 
checkpointMetrics);
                }
                catch (CancelTaskException e) {
                        throw new Exception("Operator " + getName() + " was 
cancelled while performing checkpoint " +
@@ -562,7 +565,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {
+       private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
                LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
 
                synchronized (lock) {
@@ -576,7 +579,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                operatorChain.broadcastCheckpointBarrier(
                                                
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
 
-                               checkpointState(checkpointMetaData);
+                               checkpointState(checkpointMetaData, 
checkpointMetrics);
                                return true;
                        }
                        else {
@@ -629,8 +632,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       private void checkpointState(CheckpointMetaData checkpointMetaData) 
throws Exception {
-               CheckpointingOperation checkpointingOperation = new 
CheckpointingOperation(this, checkpointMetaData);
+       private void checkpointState(CheckpointMetaData checkpointMetaData, 
CheckpointMetrics checkpointMetrics) throws Exception {
+               CheckpointingOperation checkpointingOperation = new 
CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
                checkpointingOperation.executeCheckpointing();
        }
 
@@ -868,6 +871,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                private List<StreamStateHandle> nonPartitionedStateHandles;
 
                private final CheckpointMetaData checkpointMetaData;
+               private final CheckpointMetrics checkpointMetrics;
 
                private final long asyncStartNanos;
 
@@ -879,11 +883,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                List<StreamStateHandle> 
nonPartitionedStateHandles,
                                List<OperatorSnapshotResult> 
snapshotInProgressList,
                                CheckpointMetaData checkpointMetaData,
+                               CheckpointMetrics checkpointMetrics,
                                long asyncStartNanos) {
 
                        this.owner = Preconditions.checkNotNull(owner);
                        this.snapshotInProgressList = 
Preconditions.checkNotNull(snapshotInProgressList);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
+                       this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
                        this.nonPartitionedStateHandles = 
nonPartitionedStateHandles;
                        this.asyncStartNanos = asyncStartNanos;
 
@@ -900,9 +906,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                @Override
                public void run() {
-
                        try {
-
                                // Keyed state handle future, currently only 
one (the head) operator can have this
                                KeyGroupsStateHandle keyedStateHandleBackend = 
FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
                                KeyGroupsStateHandle keyedStateHandleStream = 
FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
@@ -925,7 +929,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                final long asyncEndNanos = System.nanoTime();
                                final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
 
-                               
checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
+                               
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
 
                                ChainedStateHandle<StreamStateHandle> 
chainedNonPartitionedOperatorsState =
                                                new 
ChainedStateHandle<>(nonPartitionedStateHandles);
@@ -946,7 +950,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
 CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
                                        
owner.getEnvironment().acknowledgeCheckpoint(
                                                
checkpointMetaData.getCheckpointId(),
-                                               checkpointMetaData.getMetrics(),
+                                               checkpointMetrics,
                                                subtaskState);
 
                                        if (LOG.isDebugEnabled()) {
@@ -1039,6 +1043,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                private final StreamTask<?, ?> owner;
 
                private final CheckpointMetaData checkpointMetaData;
+               private final CheckpointMetrics checkpointMetrics;
 
                private final StreamOperator<?>[] allOperators;
 
@@ -1050,21 +1055,20 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                private final List<StreamStateHandle> nonPartitionedStates;
                private final List<OperatorSnapshotResult> 
snapshotInProgressList;
 
-               public CheckpointingOperation(StreamTask<?, ?> owner, 
CheckpointMetaData checkpointMetaData) {
+               public CheckpointingOperation(StreamTask<?, ?> owner, 
CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
                        this.owner = Preconditions.checkNotNull(owner);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
+                       this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
                        this.allOperators = 
owner.operatorChain.getAllOperators();
                        this.nonPartitionedStates = new 
ArrayList<>(allOperators.length);
                        this.snapshotInProgressList = new 
ArrayList<>(allOperators.length);
                }
 
                public void executeCheckpointing() throws Exception {
-
                        startSyncPartNano = System.nanoTime();
 
                        boolean failed = true;
                        try {
-
                                for (StreamOperator<?> op : allOperators) {
                                        checkpointStreamOperator(op);
                                }
@@ -1076,7 +1080,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                startAsyncPartNano = System.nanoTime();
 
-                               
checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - 
startSyncPartNano) / 1_000_000);
+                               
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - 
startSyncPartNano) / 1_000_000);
 
                                // at this point we are transferring ownership 
over snapshotInProgressList for cleanup to the thread
                                runAsyncCheckpointingAndAcknowledge();
@@ -1086,8 +1090,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        LOG.debug("{} - finished synchronous 
part of checkpoint {}." +
                                                        "Alignment duration: {} 
ms, snapshot duration {} ms",
                                                owner.getName(), 
checkpointMetaData.getCheckpointId(),
-                                               
checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
-                                               
checkpointMetaData.getSyncDurationMillis());
+                                               
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+                                               
checkpointMetrics.getSyncDurationMillis());
                                }
                        } finally {
                                if (failed) {
@@ -1118,8 +1122,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                                LOG.debug("{} - did NOT finish 
synchronous part of checkpoint {}." +
                                                                "Alignment 
duration: {} ms, snapshot duration {} ms",
                                                        owner.getName(), 
checkpointMetaData.getCheckpointId(),
-                                                       
checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
-                                                       
checkpointMetaData.getSyncDurationMillis());
+                                                       
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
+                                                       
checkpointMetrics.getSyncDurationMillis());
                                        }
                                }
                        }
@@ -1152,6 +1156,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        nonPartitionedStates,
                                        snapshotInProgressList,
                                        checkpointMetaData,
+                                       checkpointMetrics,
                                        startAsyncPartNano);
 
                        
owner.cancelables.registerClosable(asyncCheckpointRunnable);

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 3e618ef..46f228a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -154,7 +155,8 @@ public class BarrierBufferAlignmentLimitTest {
                check(sequence[21], buffer.getNextNonBlocked());
 
                // no call for a completed checkpoint must have happened
-               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+                       any(CheckpointMetrics.class));
 
                assertNull(buffer.getNextNonBlocked());
                assertNull(buffer.getNextNonBlocked());
@@ -240,7 +242,7 @@ public class BarrierBufferAlignmentLimitTest {
                // checkpoint 4 completed - check and validate buffered replay
                check(sequence[9], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), 
any(CheckpointMetrics.class));
 
                check(sequence[10], buffer.getNextNonBlocked());
                check(sequence[15], buffer.getNextNonBlocked());
@@ -252,7 +254,7 @@ public class BarrierBufferAlignmentLimitTest {
                check(sequence[21], buffer.getNextNonBlocked());
 
                // only checkpoint 4 was successfully completed, not checkpoint 
3
-               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), 
any(CheckpointMetrics.class));
 
                assertNull(buffer.getNextNonBlocked());
                assertNull(buffer.getNextNonBlocked());

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d17225c..869d1fe 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -565,7 +566,7 @@ public class BarrierBufferTest {
                        // checkpoint done - replay buffered
                        check(sequence[5], buffer.getNextNonBlocked());
                        validateAlignmentTime(startTs, buffer);
-                       verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(1L)));
+                       verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(1L)), any(CheckpointMetrics.class));
                        check(sequence[6], buffer.getNextNonBlocked());
 
                        check(sequence[9], buffer.getNextNonBlocked());
@@ -1007,14 +1008,14 @@ public class BarrierBufferTest {
 
                check(sequence[0], buffer.getNextNonBlocked());
                check(sequence[2], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointMetrics.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[6], buffer.getNextNonBlocked());
                assertEquals(5L, buffer.getCurrentCheckpointId());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), 
any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointMetrics.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[8], buffer.getNextNonBlocked());
@@ -1077,7 +1078,7 @@ public class BarrierBufferTest {
                check(sequence[2], buffer.getNextNonBlocked());
                startTs = System.nanoTime();
                check(sequence[5], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
 
                check(sequence[6], buffer.getNextNonBlocked());
@@ -1096,7 +1097,7 @@ public class BarrierBufferTest {
                check(sequence[16], buffer.getNextNonBlocked());
                startTs = System.nanoTime();
                check(sequence[20], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), 
any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
                check(sequence[21], buffer.getNextNonBlocked());
 
@@ -1113,7 +1114,7 @@ public class BarrierBufferTest {
                // a simple successful checkpoint
                startTs = System.nanoTime();
                check(sequence[32], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
                check(sequence[33], buffer.getNextNonBlocked());
 
@@ -1174,7 +1175,7 @@ public class BarrierBufferTest {
 
                // finished first checkpoint
                check(sequence[3], buffer.getNextNonBlocked());
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointMetrics.class));
                validateAlignmentTime(startTs, buffer);
 
                check(sequence[5], buffer.getNextNonBlocked());
@@ -1197,7 +1198,7 @@ public class BarrierBufferTest {
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                // no further checkpoint (abort) notifications
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(CheckpointDeclineOnCancellationBarrierException.class));
 
                // all done
@@ -1279,7 +1280,7 @@ public class BarrierBufferTest {
                // checkpoint done
                check(sequence[7], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(2L)));
+               verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(2L)), any(CheckpointMetrics.class));
 
                // queued data
                check(sequence[10], buffer.getNextNonBlocked());
@@ -1298,7 +1299,7 @@ public class BarrierBufferTest {
                checkNoTempFilesRemain();
 
                // check overall notifications
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
@@ -1363,7 +1364,7 @@ public class BarrierBufferTest {
                // checkpoint finished
                check(sequence[7], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointMetrics.class));
                check(sequence[11], buffer.getNextNonBlocked());
 
                // remaining data
@@ -1379,7 +1380,7 @@ public class BarrierBufferTest {
                checkNoTempFilesRemain();
 
                // check overall notifications
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
@@ -1491,17 +1492,17 @@ public class BarrierBufferTest {
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
                        assertTrue("wrong checkpoint id",
                                        nextExpectedCheckpointId == -1L || 
                                        nextExpectedCheckpointId == 
checkpointMetaData.getCheckpointId());
 
                        assertTrue(checkpointMetaData.getTimestamp() > 0);
-                       
assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
-                       
assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
+                       
assertTrue(checkpointMetrics.getBytesBufferedInAlignment() >= 0);
+                       
assertTrue(checkpointMetrics.getAlignmentDurationNanos() >= 0);
 
                        nextExpectedCheckpointId++;
-                       lastReportedBytesBufferedInAlignment = 
checkpointMetaData.getBytesBufferedInAlignment();
+                       lastReportedBytesBufferedInAlignment = 
checkpointMetrics.getBytesBufferedInAlignment();
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 0d9e6ac..da322f6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -506,7 +507,7 @@ public class BarrierTrackerTest {
                }
 
                @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
+               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
                        assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
 
                        final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 492b470..5c0f0cf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -275,7 +276,7 @@ public class BlockingCheckpointsTest {
 
                @Override
                protected void run() throws Exception {
-                       triggerCheckpointOnBarrier(new CheckpointMetaData(11L, 
System.currentTimeMillis()));
+                       triggerCheckpointOnBarrier(new CheckpointMetaData(11L, 
System.currentTimeMillis()), new CheckpointMetrics());
                }
 
                @Override

Reply via email to