[FLINK-4379] [checkpoints] Fix minor bug and improve debug logging
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f8f5eb3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f8f5eb3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f8f5eb3 Branch: refs/heads/master Commit: 6f8f5eb3b9ba07cd3bb4d9f7edd43d4b8862acbe Parents: 53ed6ad Author: Stephan Ewen <[email protected]> Authored: Thu Sep 29 21:12:38 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Sep 30 12:38:46 2016 +0200 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6f8f5eb3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 1725eca..88c3ba4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -717,6 +717,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> cancelables.registerClosable(asyncCheckpointRunnable); asyncOperationsThreadPool.submit(asyncCheckpointRunnable); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} - finished synchronous part of checkpoint {}." + + "Alignment duration: {} ms, snapshot duration {} ms", + getName(), checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis); + } + return true; } else { return false; @@ -998,12 +1005,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> final long asyncEndNanos = System.nanoTime(); final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000; - if (nonPartitionedStateHandles.isEmpty() && keyedStates.isEmpty()) { - owner.getEnvironment().acknowledgeCheckpoint(checkpointId, + if (nonPartitionedStateHandles.isEmpty() && partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) { + owner.getEnvironment().acknowledgeCheckpoint( + checkpointId, syncDurationMillies, asyncDurationMillis, bytesBufferedInAlignment, alignmentDurationNanos); - } else { - + } else { CheckpointStateHandles allStateHandles = new CheckpointStateHandles( nonPartitionedStateHandles, partitioneableStateHandles, @@ -1016,8 +1023,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } if (LOG.isDebugEnabled()) { - LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}. Returning handles on " + - "keyed states {}.", checkpointId, name, keyedStates); + LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", + owner.getName(), checkpointId, asyncDurationMillis); } } catch (Exception e) {
