[FLINK-4379] [checkpoints] Fix minor bug and improve debug logging

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f8f5eb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f8f5eb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f8f5eb3

Branch: refs/heads/master
Commit: 6f8f5eb3b9ba07cd3bb4d9f7edd43d4b8862acbe
Parents: 53ed6ad
Author: Stephan Ewen <[email protected]>
Authored: Thu Sep 29 21:12:38 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Sep 30 12:38:46 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java      | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f8f5eb3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1725eca..88c3ba4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -717,6 +717,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                
cancelables.registerClosable(asyncCheckpointRunnable);
                                
asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
+
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("{} - finished synchronous 
part of checkpoint {}." +
+                                                       "Alignment duration: {} 
ms, snapshot duration {} ms",
+                                                       getName(), 
checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+                               }
+
                                return true;
                        } else {
                                return false;
@@ -998,12 +1005,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                final long asyncEndNanos = System.nanoTime();
                                final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
 
-                               if (nonPartitionedStateHandles.isEmpty() && 
keyedStates.isEmpty()) {
-                                       
owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+                               if (nonPartitionedStateHandles.isEmpty() && 
partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
+                                       
owner.getEnvironment().acknowledgeCheckpoint(
+                                                       checkpointId,
                                                        syncDurationMillies, 
asyncDurationMillis,
                                                        
bytesBufferedInAlignment, alignmentDurationNanos);
-                               } else  {
-
+                               } else {
                                        CheckpointStateHandles allStateHandles 
= new CheckpointStateHandles(
                                                        
nonPartitionedStateHandles,
                                                        
partitioneableStateHandles,
@@ -1016,8 +1023,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                }
 
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Finished asynchronous 
checkpoints for checkpoint {} on task {}. Returning handles on " +
-                                                       "keyed states {}.", 
checkpointId, name, keyedStates);
+                                       LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms", 
+                                                       owner.getName(), 
checkpointId, asyncDurationMillis);
                                }
                        }
                        catch (Exception e) {

Reply via email to