[FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cb8cde4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cb8cde4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cb8cde4 Branch: refs/heads/master Commit: 1cb8cde48e054395d808f6fe985ae60648e0b6b5 Parents: 2edc971 Author: Ufuk Celebi <[email protected]> Authored: Wed Feb 15 18:16:44 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 22 12:14:55 2017 +0100 ---------------------------------------------------------------------- .../runtime/checkpoint/CheckpointMetaData.java | 81 +------------------- .../runtime/checkpoint/CheckpointMetrics.java | 20 ++++- .../runtime/jobgraph/tasks/StatefulTask.java | 6 +- .../jobmanager/JobManagerHARecoveryTest.java | 2 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 3 +- .../streaming/runtime/io/BarrierBuffer.java | 5 +- .../streaming/runtime/io/BarrierTracker.java | 10 +-- .../streaming/runtime/tasks/OperatorChain.java | 1 - .../streaming/runtime/tasks/StreamTask.java | 49 ++++++------ .../io/BarrierBufferAlignmentLimitTest.java | 8 +- .../streaming/runtime/io/BarrierBufferTest.java | 35 +++++---- .../runtime/io/BarrierTrackerTest.java | 3 +- .../runtime/tasks/BlockingCheckpointsTest.java | 3 +- 13 files changed, 87 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java index 2627b22..9960b44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.util.Preconditions; - import java.io.Serializable; /** @@ -35,65 +33,9 @@ public class CheckpointMetaData implements Serializable { /** The timestamp of the checkpoint */ private final long timestamp; - private final CheckpointMetrics metrics; - public CheckpointMetaData(long checkpointId, long timestamp) { this.checkpointId = checkpointId; this.timestamp = timestamp; - this.metrics = new CheckpointMetrics(); - } - - public CheckpointMetaData( - long checkpointId, - long timestamp, - long synchronousDurationMillis, - long asynchronousDurationMillis, - long bytesBufferedInAlignment, - long alignmentDurationNanos) { - this.checkpointId = checkpointId; - this.timestamp = timestamp; - this.metrics = new CheckpointMetrics( - bytesBufferedInAlignment, - alignmentDurationNanos, - synchronousDurationMillis, - asynchronousDurationMillis); - } - - public CheckpointMetaData( - long checkpointId, - long timestamp, - CheckpointMetrics metrics) { - this.checkpointId = checkpointId; - this.timestamp = timestamp; - this.metrics = Preconditions.checkNotNull(metrics); - } - - public CheckpointMetrics getMetrics() { - return metrics; - } - - public CheckpointMetaData setBytesBufferedInAlignment(long bytesBufferedInAlignment) { - Preconditions.checkArgument(bytesBufferedInAlignment >= 0); - this.metrics.setBytesBufferedInAlignment(bytesBufferedInAlignment); - return this; - } - - public CheckpointMetaData setAlignmentDurationNanos(long alignmentDurationNanos) { - Preconditions.checkArgument(alignmentDurationNanos >= 0); - this.metrics.setAlignmentDurationNanos(alignmentDurationNanos); - return this; - } - - public CheckpointMetaData setSyncDurationMillis(long syncDurationMillis) { - Preconditions.checkArgument(syncDurationMillis >= 0); - this.metrics.setSyncDurationMillis(syncDurationMillis); - return this; - } - - public CheckpointMetaData setAsyncDurationMillis(long asyncDurationMillis) { - Preconditions.checkArgument(asyncDurationMillis >= 0); - this.metrics.setAsyncDurationMillis(asyncDurationMillis); - return this; } public long getCheckpointId() { @@ -104,22 +46,6 @@ public class CheckpointMetaData implements Serializable { return timestamp; } - public long getBytesBufferedInAlignment() { - return metrics.getBytesBufferedInAlignment(); - } - - public long getAlignmentDurationNanos() { - return metrics.getAlignmentDurationNanos(); - } - - public long getSyncDurationMillis() { - return metrics.getSyncDurationMillis(); - } - - public long getAsyncDurationMillis() { - return metrics.getAsyncDurationMillis(); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -132,15 +58,13 @@ public class CheckpointMetaData implements Serializable { CheckpointMetaData that = (CheckpointMetaData) o; return (checkpointId == that.checkpointId) - && (timestamp == that.timestamp) - && (metrics.equals(that.metrics)); + && (timestamp == that.timestamp); } @Override public int hashCode() { int result = (int) (checkpointId ^ (checkpointId >>> 32)); result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - result = 31 * result + metrics.hashCode(); return result; } @@ -149,7 +73,6 @@ public class CheckpointMetaData implements Serializable { return "CheckpointMetaData{" + "checkpointId=" + checkpointId + ", timestamp=" + timestamp + - ", metrics=" + metrics + '}'; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java index be73adb..a90a2e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint; +import static org.apache.flink.util.Preconditions.checkArgument; + import java.io.Serializable; /** @@ -49,6 +51,12 @@ public class CheckpointMetrics implements Serializable { long syncDurationMillis, long asyncDurationMillis) { + // these may be "-1", in case the values are unknown or not set + checkArgument(syncDurationMillis >= -1); + checkArgument(asyncDurationMillis >= -1); + checkArgument(bytesBufferedInAlignment >= -1); + checkArgument(alignmentDurationNanos >= -1); + this.bytesBufferedInAlignment = bytesBufferedInAlignment; this.alignmentDurationNanos = alignmentDurationNanos; this.syncDurationMillis = syncDurationMillis; @@ -59,32 +67,36 @@ public class CheckpointMetrics implements Serializable { return bytesBufferedInAlignment; } - public void setBytesBufferedInAlignment(long bytesBufferedInAlignment) { + public CheckpointMetrics setBytesBufferedInAlignment(long bytesBufferedInAlignment) { this.bytesBufferedInAlignment = bytesBufferedInAlignment; + return this; } public long getAlignmentDurationNanos() { return alignmentDurationNanos; } - public void setAlignmentDurationNanos(long alignmentDurationNanos) { + public CheckpointMetrics setAlignmentDurationNanos(long alignmentDurationNanos) { this.alignmentDurationNanos = alignmentDurationNanos; + return this; } public long getSyncDurationMillis() { return syncDurationMillis; } - public void setSyncDurationMillis(long syncDurationMillis) { + public CheckpointMetrics setSyncDurationMillis(long syncDurationMillis) { this.syncDurationMillis = syncDurationMillis; + return this; } public long getAsyncDurationMillis() { return asyncDurationMillis; } - public void setAsyncDurationMillis(long asyncDurationMillis) { + public CheckpointMetrics setAsyncDurationMillis(long asyncDurationMillis) { this.asyncDurationMillis = asyncDurationMillis; + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index 39ddc961..87b66ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.state.TaskStateHandles; /** @@ -41,7 +42,7 @@ public interface StatefulTask { * * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers, * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of - * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData)} + * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)} * method. * * @param checkpointMetaData Meta data for about this checkpoint @@ -55,10 +56,11 @@ public interface StatefulTask { * barriers on all input streams. * * @param checkpointMetaData Meta data for about this checkpoint + * @param checkpointMetrics Metrics about this checkpoint * * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded. */ - void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception; + void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception; /** * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers, http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index c7c35ec..de54d1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -619,7 +619,7 @@ public class JobManagerHARecoveryTest { } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { throw new UnsupportedOperationException("should not be called!"); } http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 26b8cdb..187163d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -25,6 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -242,7 +243,7 @@ public class TaskAsyncCallTest { } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { throw new UnsupportedOperationException("Should not be called"); } http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index e91c26a..611bd44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; @@ -363,11 +364,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L; - checkpointMetaData + CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(bytesBuffered) .setAlignmentDurationNanos(latestAlignmentDurationNanos); - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData); + toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 9351f1b..77608c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; @@ -250,12 +251,11 @@ public class BarrierTracker implements CheckpointBarrierHandler { private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception { if (toNotifyOnCheckpoint != null) { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + CheckpointMetrics checkpointMetrics = new CheckpointMetrics() + .setBytesBufferedInAlignment(0L) + .setAlignmentDurationNanos(0L); - checkpointMetaData - .setBytesBufferedInAlignment(0L) - .setAlignmentDurationNanos(0L); - - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData); + toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 7e24eea..591ed3c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -162,7 +162,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } } - public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException { try { CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp); http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 92fc6e5..60afd60 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -516,10 +517,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { try { - checkpointMetaData. - setBytesBufferedInAlignment(0L). - setAlignmentDurationNanos(0L); - return performCheckpoint(checkpointMetaData); + // No alignment if we inject a checkpoint + CheckpointMetrics checkpointMetrics = new CheckpointMetrics() + .setBytesBufferedInAlignment(0L) + .setAlignmentDurationNanos(0L); + + return performCheckpoint(checkpointMetaData, checkpointMetrics); } catch (Exception e) { // propagate exceptions only if the task is still in "running" state @@ -535,9 +538,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { try { - performCheckpoint(checkpointMetaData); + performCheckpoint(checkpointMetaData, checkpointMetrics); } catch (CancelTaskException e) { throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " + @@ -562,7 +565,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { + private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); synchronized (lock) { @@ -576,7 +579,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); - checkpointState(checkpointMetaData); + checkpointState(checkpointMetaData, checkpointMetrics); return true; } else { @@ -629,8 +632,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - private void checkpointState(CheckpointMetaData checkpointMetaData) throws Exception { - CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData); + private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { + CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics); checkpointingOperation.executeCheckpointing(); } @@ -868,6 +871,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private List<StreamStateHandle> nonPartitionedStateHandles; private final CheckpointMetaData checkpointMetaData; + private final CheckpointMetrics checkpointMetrics; private final long asyncStartNanos; @@ -879,11 +883,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> List<StreamStateHandle> nonPartitionedStateHandles, List<OperatorSnapshotResult> snapshotInProgressList, CheckpointMetaData checkpointMetaData, + CheckpointMetrics checkpointMetrics, long asyncStartNanos) { this.owner = Preconditions.checkNotNull(owner); this.snapshotInProgressList = Preconditions.checkNotNull(snapshotInProgressList); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); + this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.nonPartitionedStateHandles = nonPartitionedStateHandles; this.asyncStartNanos = asyncStartNanos; @@ -900,9 +906,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @Override public void run() { - try { - // Keyed state handle future, currently only one (the head) operator can have this KeyGroupsStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); @@ -925,7 +929,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> final long asyncEndNanos = System.nanoTime(); final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000; - checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis); + checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState = new ChainedStateHandle<>(nonPartitionedStateHandles); @@ -946,7 +950,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { owner.getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), - checkpointMetaData.getMetrics(), + checkpointMetrics, subtaskState); if (LOG.isDebugEnabled()) { @@ -1039,6 +1043,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final StreamTask<?, ?> owner; private final CheckpointMetaData checkpointMetaData; + private final CheckpointMetrics checkpointMetrics; private final StreamOperator<?>[] allOperators; @@ -1050,21 +1055,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final List<StreamStateHandle> nonPartitionedStates; private final List<OperatorSnapshotResult> snapshotInProgressList; - public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData) { + public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) { this.owner = Preconditions.checkNotNull(owner); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); + this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.allOperators = owner.operatorChain.getAllOperators(); this.nonPartitionedStates = new ArrayList<>(allOperators.length); this.snapshotInProgressList = new ArrayList<>(allOperators.length); } public void executeCheckpointing() throws Exception { - startSyncPartNano = System.nanoTime(); boolean failed = true; try { - for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op); } @@ -1076,7 +1080,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> startAsyncPartNano = System.nanoTime(); - checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); + checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread runAsyncCheckpointingAndAcknowledge(); @@ -1086,8 +1090,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> LOG.debug("{} - finished synchronous part of checkpoint {}." + "Alignment duration: {} ms, snapshot duration {} ms", owner.getName(), checkpointMetaData.getCheckpointId(), - checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, - checkpointMetaData.getSyncDurationMillis()); + checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, + checkpointMetrics.getSyncDurationMillis()); } } finally { if (failed) { @@ -1118,8 +1122,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." + "Alignment duration: {} ms, snapshot duration {} ms", owner.getName(), checkpointMetaData.getCheckpointId(), - checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, - checkpointMetaData.getSyncDurationMillis()); + checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, + checkpointMetrics.getSyncDurationMillis()); } } } @@ -1152,6 +1156,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> nonPartitionedStates, snapshotInProgressList, checkpointMetaData, + checkpointMetrics, startAsyncPartNano); owner.cancelables.registerClosable(asyncCheckpointRunnable); http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index 3e618ef..46f228a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -154,7 +155,8 @@ public class BarrierBufferAlignmentLimitTest { check(sequence[21], buffer.getNextNonBlocked()); // no call for a completed checkpoint must have happened - verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), + any(CheckpointMetrics.class)); assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); @@ -240,7 +242,7 @@ public class BarrierBufferAlignmentLimitTest { // checkpoint 4 completed - check and validate buffered replay check(sequence[9], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class)); check(sequence[10], buffer.getNextNonBlocked()); check(sequence[15], buffer.getNextNonBlocked()); @@ -252,7 +254,7 @@ public class BarrierBufferAlignmentLimitTest { check(sequence[21], buffer.getNextNonBlocked()); // only checkpoint 4 was successfully completed, not checkpoint 3 - verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class)); assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index d17225c..869d1fe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -565,7 +566,7 @@ public class BarrierBufferTest { // checkpoint done - replay buffered check(sequence[5], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); check(sequence[6], buffer.getNextNonBlocked()); check(sequence[9], buffer.getNextNonBlocked()); @@ -1007,14 +1008,14 @@ public class BarrierBufferTest { check(sequence[0], buffer.getNextNonBlocked()); check(sequence[2], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[6], buffer.getNextNonBlocked()); assertEquals(5L, buffer.getCurrentCheckpointId()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[8], buffer.getNextNonBlocked()); @@ -1077,7 +1078,7 @@ public class BarrierBufferTest { check(sequence[2], buffer.getNextNonBlocked()); startTs = System.nanoTime(); check(sequence[5], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[6], buffer.getNextNonBlocked()); @@ -1096,7 +1097,7 @@ public class BarrierBufferTest { check(sequence[16], buffer.getNextNonBlocked()); startTs = System.nanoTime(); check(sequence[20], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[21], buffer.getNextNonBlocked()); @@ -1113,7 +1114,7 @@ public class BarrierBufferTest { // a simple successful checkpoint startTs = System.nanoTime(); check(sequence[32], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[33], buffer.getNextNonBlocked()); @@ -1174,7 +1175,7 @@ public class BarrierBufferTest { // finished first checkpoint check(sequence[3], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer); check(sequence[5], buffer.getNextNonBlocked()); @@ -1197,7 +1198,7 @@ public class BarrierBufferTest { assertEquals(0L, buffer.getAlignmentDurationNanos()); // no further checkpoint (abort) notifications - verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class)); // all done @@ -1279,7 +1280,7 @@ public class BarrierBufferTest { // checkpoint done check(sequence[7], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class)); // queued data check(sequence[10], buffer.getNextNonBlocked()); @@ -1298,7 +1299,7 @@ public class BarrierBufferTest { checkNoTempFilesRemain(); // check overall notifications - verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } @@ -1363,7 +1364,7 @@ public class BarrierBufferTest { // checkpoint finished check(sequence[7], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class)); check(sequence[11], buffer.getNextNonBlocked()); // remaining data @@ -1379,7 +1380,7 @@ public class BarrierBufferTest { checkNoTempFilesRemain(); // check overall notifications - verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } @@ -1491,17 +1492,17 @@ public class BarrierBufferTest { } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointMetaData.getCheckpointId()); assertTrue(checkpointMetaData.getTimestamp() > 0); - assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0); - assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0); + assertTrue(checkpointMetrics.getBytesBufferedInAlignment() >= 0); + assertTrue(checkpointMetrics.getAlignmentDurationNanos() >= 0); nextExpectedCheckpointId++; - lastReportedBytesBufferedInAlignment = checkpointMetaData.getBytesBufferedInAlignment(); + lastReportedBytesBufferedInAlignment = checkpointMetrics.getBytesBufferedInAlignment(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 0d9e6ac..da322f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -506,7 +507,7 @@ public class BarrierTrackerTest { } @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception { assertTrue("More checkpoints than expected", i < checkpointIDs.length); final long expectedId = checkpointIDs[i++]; http://git-wip-us.apache.org/repos/asf/flink/blob/1cb8cde4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index 492b470..5c0f0cf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -27,6 +27,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -275,7 +276,7 @@ public class BlockingCheckpointsTest { @Override protected void run() throws Exception { - triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis())); + triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics()); } @Override
