This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 928f2d1  [FLINK-10131][network] improve logging around subpartitions
928f2d1 is described below

commit 928f2d1fd8964c7855149622f6c052a8e2776ee0
Author: Nico Kruber <[email protected]>
AuthorDate: Mon Sep 10 11:14:01 2018 +0200

    [FLINK-10131][network] improve logging around subpartitions
    
    - add task name
    - add subpartition index
    
    This closes #6547.
---
 .../runtime/io/network/partition/PipelinedSubpartition.java   | 11 ++++++-----
 .../flink/runtime/io/network/partition/ResultPartition.java   |  4 ++++
 .../runtime/io/network/partition/SpillableSubpartition.java   | 11 +++++++----
 3 files changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index cc79363..c6f3e15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -80,7 +80,7 @@ class PipelinedSubpartition extends ResultSubpartition {
        @Override
        public void finish() throws IOException {
                
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
-               LOG.debug("Finished {}.", this);
+               LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
        }
 
        private boolean add(BufferConsumer bufferConsumer, boolean finish) {
@@ -132,7 +132,7 @@ class PipelinedSubpartition extends ResultSubpartition {
                        isReleased = true;
                }
 
-               LOG.debug("Released {}.", this);
+               LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
 
                if (view != null) {
                        view.releaseAllResources();
@@ -224,7 +224,8 @@ class PipelinedSubpartition extends ResultSubpartition {
                                        "Subpartition %s of is being (or 
already has been) consumed, " +
                                        "but pipelined subpartitions can only 
be consumed once.", index, parent.getPartitionId());
 
-                       LOG.debug("Creating read view for subpartition {} of 
partition {}.", index, parent.getPartitionId());
+                       LOG.debug("{}: Creating read view for subpartition {} 
of partition {}.",
+                               parent.getOwningTaskName(), index, 
parent.getPartitionId());
 
                        readView = new PipelinedSubpartitionView(this, 
availabilityListener);
                        if (!buffers.isEmpty()) {
@@ -268,8 +269,8 @@ class PipelinedSubpartition extends ResultSubpartition {
                }
 
                return String.format(
-                       "PipelinedSubpartition [number of buffers: %d (%d 
bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
-                       numBuffers, numBytes, getBuffersInBacklog(), finished, 
hasReadView);
+                       "PipelinedSubpartition#%d [number of buffers: %d (%d 
bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
+                       index, numBuffers, numBytes, getBuffersInBacklog(), 
finished, hasReadView);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index fbbfa4b..93e5ba1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -199,6 +199,10 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                return jobId;
        }
 
+       public String getOwningTaskName() {
+               return owningTaskName;
+       }
+
        public ResultPartitionID getPartitionId() {
                return partitionId;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 69b461b..9f696ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -143,6 +143,7 @@ class SpillableSubpartition extends ResultSubpartition {
                if (spillWriter != null) {
                        spillWriter.close();
                }
+               LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
        }
 
        @Override
@@ -180,6 +181,8 @@ class SpillableSubpartition extends ResultSubpartition {
                        isReleased = true;
                }
 
+               LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
+
                if (view != null) {
                        view.releaseAllResources();
                }
@@ -236,8 +239,8 @@ class SpillableSubpartition extends ResultSubpartition {
                                long spilledBytes = 
spillFinishedBufferConsumers(isFinished);
                                int spilledBuffers = numberOfBuffers - 
buffers.size();
 
-                               LOG.debug("Spilling {} bytes ({} buffers} for 
sub partition {} of {}.",
-                                       spilledBytes, spilledBuffers, index, 
parent.getPartitionId());
+                               LOG.debug("{}: Spilling {} bytes ({} buffers} 
for sub partition {} of {}.",
+                                       parent.getOwningTaskName(), 
spilledBytes, spilledBuffers, index, parent.getPartitionId());
 
                                return spilledBuffers;
                        }
@@ -300,9 +303,9 @@ class SpillableSubpartition extends ResultSubpartition {
 
        @Override
        public String toString() {
-               return String.format("SpillableSubpartition [%d number of 
buffers (%d bytes)," +
+               return String.format("SpillableSubpartition#%d [%d number of 
buffers (%d bytes)," +
                                "%d number of buffers in backlog, finished? %s, 
read view? %s, spilled? %s]",
-                       getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+                       index, getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(),
                        getBuffersInBacklog(), isFinished, readView != null, 
spillWriter != null);
        }
 

Reply via email to