This is an automated email from the ASF dual-hosted git repository.
guoweima pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 77e0478 [FLINK-22305][network] Improve log messages of sort-merge
blocking shuffle
77e0478 is described below
commit 77e0478a7879bd41f9f52872d84c467d158b8098
Author: kevin.cyj <[email protected]>
AuthorDate: Sat Apr 17 15:02:44 2021 +0800
[FLINK-22305][network] Improve log messages of sort-merge blocking shuffle
This closes #15652.
---
.../io/disk/BatchShuffleReadBufferPool.java | 10 ++++-----
.../io/network/partition/PartitionedFile.java | 25 ++++++++++++++++++++--
.../network/partition/PartitionedFileWriter.java | 15 ++++++++++++-
.../partition/SortMergeResultPartition.java | 20 ++++++++---------
4 files changed, 52 insertions(+), 18 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index 01da315..1000a44 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -135,11 +135,6 @@ public class BatchShuffleReadBufferPool {
/** Initializes this buffer pool which allocates all the buffers. */
public void initialize() {
- LOG.info(
- "Initializing batch shuffle IO buffer pool: numBuffers={},
bufferSize={}.",
- numTotalBuffers,
- bufferSize);
-
synchronized (buffers) {
checkState(!destroyed, "Buffer pool is already destroyed.");
@@ -175,6 +170,11 @@ public class BatchShuffleReadBufferPool {
TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
}
}
+
+ LOG.info(
+ "Batch shuffle IO buffer pool initialized: numBuffers={},
bufferSize={}.",
+ numTotalBuffers,
+ bufferSize);
}
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index 145a775..dd34fe0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -63,6 +63,15 @@ public class PartitionedFile {
/** Path of the index file which stores all index entries in this {@link
PartitionedFile}. */
private final Path indexFilePath;
+ /** Size of the data file. */
+ private final long dataFileSize;
+
+ /** Size of the index file. */
+ private final long indexFileSize;
+
+ /** Total number of buffers in the data file. */
+ private final long numBuffers;
+
/** Used to accelerate index data access. */
@Nullable private final ByteBuffer indexEntryCache;
@@ -71,6 +80,9 @@ public class PartitionedFile {
int numSubpartitions,
Path dataFilePath,
Path indexFilePath,
+ long dataFileSize,
+ long indexFileSize,
+ long numBuffers,
@Nullable ByteBuffer indexEntryCache) {
checkArgument(numRegions >= 0, "Illegal number of data regions.");
checkArgument(numSubpartitions > 0, "Illegal number of
subpartitions.");
@@ -79,6 +91,9 @@ public class PartitionedFile {
this.numSubpartitions = numSubpartitions;
this.dataFilePath = checkNotNull(dataFilePath);
this.indexFilePath = checkNotNull(indexFilePath);
+ this.dataFileSize = dataFileSize;
+ this.indexFileSize = indexFileSize;
+ this.numBuffers = numBuffers;
this.indexEntryCache = indexEntryCache;
}
@@ -144,8 +159,14 @@ public class PartitionedFile {
+ dataFilePath
+ ", indexFilePath="
+ indexFilePath
- + ", indexDataCache="
- + indexEntryCache
+ + ", dataFileSize="
+ + dataFileSize
+ + ", indexFileSize="
+ + indexFileSize
+ + ", numBuffers="
+ + numBuffers
+ + ", indexDataCached="
+ + (indexEntryCache != null)
+ '}';
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
index 5090d87..e9952c4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
@@ -83,6 +83,9 @@ public class PartitionedFileWriter implements AutoCloseable {
/** Number of regions written to the target {@link PartitionedFile}. */
private int numRegions;
+ /** Total number of buffers in the data file. */
+ private long numBuffers;
+
/** Current subpartition to write buffers to. */
private int currentSubpartition = -1;
@@ -215,6 +218,7 @@ public class PartitionedFileWriter implements AutoCloseable
{
return;
}
+ numBuffers += bufferWithChannels.size();
long expectedBytes;
ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 *
bufferWithChannels.size()];
@@ -300,6 +304,8 @@ public class PartitionedFileWriter implements AutoCloseable
{
flushIndexBuffer();
indexBuffer.rewind();
+ long dataFileSize = dataFileChannel.size();
+ long indexFileSize = indexFileChannel.size();
close();
ByteBuffer indexEntryCache = null;
@@ -308,7 +314,14 @@ public class PartitionedFileWriter implements
AutoCloseable {
}
indexBuffer = null;
return new PartitionedFile(
- numRegions, numSubpartitions, dataFilePath, indexFilePath,
indexEntryCache);
+ numRegions,
+ numSubpartitions,
+ dataFilePath,
+ indexFilePath,
+ dataFileSize,
+ indexFileSize,
+ numBuffers,
+ indexEntryCache);
}
/** Used to close and delete the failed {@link PartitionedFile} when any
exception occurs. */
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 0f81940..db15bca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -160,18 +160,12 @@ public class SortMergeResultPartition extends
ResultPartition {
}
int numRequiredBuffer = bufferPool.getNumberOfRequiredMemorySegments();
- String errorMessage =
- String.format(
- "Too few sort buffers, please increase %s to a larger
value (more than %d).",
-
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS,
- 2 * expectedWriteBuffers);
- if (numRequiredBuffer < 2 * expectedWriteBuffers) {
- LOG.warn(errorMessage);
- }
-
int numWriteBuffers = Math.min(numRequiredBuffer / 2,
expectedWriteBuffers);
if (numWriteBuffers < 1) {
- throw new IOException(errorMessage);
+ throw new IOException(
+ String.format(
+ "Too few sort buffers, please increase %s.",
+
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS));
}
numBuffersForSort = numRequiredBuffer - numWriteBuffers;
@@ -187,6 +181,12 @@ public class SortMergeResultPartition extends
ResultPartition {
throw new IOException(exception);
}
}
+
+ LOG.info(
+ "Sort-merge partition {} initialized, num sort buffers: {},
num write buffers: {}.",
+ getPartitionId(),
+ numBuffersForSort,
+ numWriteBuffers);
}
@Override