This is an automated email from the ASF dual-hosted git repository.

guoweima pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e0478  [FLINK-22305][network] Improve log messages of sort-merge 
blocking shuffle
77e0478 is described below

commit 77e0478a7879bd41f9f52872d84c467d158b8098
Author: kevin.cyj <[email protected]>
AuthorDate: Sat Apr 17 15:02:44 2021 +0800

    [FLINK-22305][network] Improve log messages of sort-merge blocking shuffle
    
    This closes #15652.
---
 .../io/disk/BatchShuffleReadBufferPool.java        | 10 ++++-----
 .../io/network/partition/PartitionedFile.java      | 25 ++++++++++++++++++++--
 .../network/partition/PartitionedFileWriter.java   | 15 ++++++++++++-
 .../partition/SortMergeResultPartition.java        | 20 ++++++++---------
 4 files changed, 52 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index 01da315..1000a44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -135,11 +135,6 @@ public class BatchShuffleReadBufferPool {
 
     /** Initializes this buffer pool which allocates all the buffers. */
     public void initialize() {
-        LOG.info(
-                "Initializing batch shuffle IO buffer pool: numBuffers={}, 
bufferSize={}.",
-                numTotalBuffers,
-                bufferSize);
-
         synchronized (buffers) {
             checkState(!destroyed, "Buffer pool is already destroyed.");
 
@@ -175,6 +170,11 @@ public class BatchShuffleReadBufferPool {
                                 
TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
             }
         }
+
+        LOG.info(
+                "Batch shuffle IO buffer pool initialized: numBuffers={}, 
bufferSize={}.",
+                numTotalBuffers,
+                bufferSize);
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index 145a775..dd34fe0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -63,6 +63,15 @@ public class PartitionedFile {
     /** Path of the index file which stores all index entries in this {@link 
PartitionedFile}. */
     private final Path indexFilePath;
 
+    /** Size of the data file. */
+    private final long dataFileSize;
+
+    /** Size of the index file. */
+    private final long indexFileSize;
+
+    /** Total number of buffers in the data file. */
+    private final long numBuffers;
+
     /** Used to accelerate index data access. */
     @Nullable private final ByteBuffer indexEntryCache;
 
@@ -71,6 +80,9 @@ public class PartitionedFile {
             int numSubpartitions,
             Path dataFilePath,
             Path indexFilePath,
+            long dataFileSize,
+            long indexFileSize,
+            long numBuffers,
             @Nullable ByteBuffer indexEntryCache) {
         checkArgument(numRegions >= 0, "Illegal number of data regions.");
         checkArgument(numSubpartitions > 0, "Illegal number of 
subpartitions.");
@@ -79,6 +91,9 @@ public class PartitionedFile {
         this.numSubpartitions = numSubpartitions;
         this.dataFilePath = checkNotNull(dataFilePath);
         this.indexFilePath = checkNotNull(indexFilePath);
+        this.dataFileSize = dataFileSize;
+        this.indexFileSize = indexFileSize;
+        this.numBuffers = numBuffers;
         this.indexEntryCache = indexEntryCache;
     }
 
@@ -144,8 +159,14 @@ public class PartitionedFile {
                 + dataFilePath
                 + ", indexFilePath="
                 + indexFilePath
-                + ", indexDataCache="
-                + indexEntryCache
+                + ", dataFileSize="
+                + dataFileSize
+                + ", indexFileSize="
+                + indexFileSize
+                + ", numBuffers="
+                + numBuffers
+                + ", indexDataCached="
+                + (indexEntryCache != null)
                 + '}';
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
index 5090d87..e9952c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
@@ -83,6 +83,9 @@ public class PartitionedFileWriter implements AutoCloseable {
     /** Number of regions written to the target {@link PartitionedFile}. */
     private int numRegions;
 
+    /** Total number of buffers in the data file. */
+    private long numBuffers;
+
     /** Current subpartition to write buffers to. */
     private int currentSubpartition = -1;
 
@@ -215,6 +218,7 @@ public class PartitionedFileWriter implements AutoCloseable 
{
             return;
         }
 
+        numBuffers += bufferWithChannels.size();
         long expectedBytes;
         ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * 
bufferWithChannels.size()];
 
@@ -300,6 +304,8 @@ public class PartitionedFileWriter implements AutoCloseable 
{
         flushIndexBuffer();
         indexBuffer.rewind();
 
+        long dataFileSize = dataFileChannel.size();
+        long indexFileSize = indexFileChannel.size();
         close();
 
         ByteBuffer indexEntryCache = null;
@@ -308,7 +314,14 @@ public class PartitionedFileWriter implements 
AutoCloseable {
         }
         indexBuffer = null;
         return new PartitionedFile(
-                numRegions, numSubpartitions, dataFilePath, indexFilePath, 
indexEntryCache);
+                numRegions,
+                numSubpartitions,
+                dataFilePath,
+                indexFilePath,
+                dataFileSize,
+                indexFileSize,
+                numBuffers,
+                indexEntryCache);
     }
 
     /** Used to close and delete the failed {@link PartitionedFile} when any 
exception occurs. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 0f81940..db15bca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -160,18 +160,12 @@ public class SortMergeResultPartition extends 
ResultPartition {
         }
 
         int numRequiredBuffer = bufferPool.getNumberOfRequiredMemorySegments();
-        String errorMessage =
-                String.format(
-                        "Too few sort buffers, please increase %s to a larger 
value (more than %d).",
-                        
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS,
-                        2 * expectedWriteBuffers);
-        if (numRequiredBuffer < 2 * expectedWriteBuffers) {
-            LOG.warn(errorMessage);
-        }
-
         int numWriteBuffers = Math.min(numRequiredBuffer / 2, 
expectedWriteBuffers);
         if (numWriteBuffers < 1) {
-            throw new IOException(errorMessage);
+            throw new IOException(
+                    String.format(
+                            "Too few sort buffers, please increase %s.",
+                            
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS));
         }
         numBuffersForSort = numRequiredBuffer - numWriteBuffers;
 
@@ -187,6 +181,12 @@ public class SortMergeResultPartition extends 
ResultPartition {
                 throw new IOException(exception);
             }
         }
+
+        LOG.info(
+                "Sort-merge partition {} initialized, num sort buffers: {}, 
num write buffers: {}.",
+                getPartitionId(),
+                numBuffersForSort,
+                numWriteBuffers);
     }
 
     @Override

Reply via email to