zhuzhurk commented on a change in pull request #15192:
URL: https://github.com/apache/flink/pull/15192#discussion_r594963510
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
##########
@@ -202,26 +190,51 @@ private void flushIndexBuffer() throws IOException {
}
/**
- * Writes a {@link Buffer} of the given subpartition to the this {@link
PartitionedFile}. In a
- * data region, all data of the same subpartition must be written together.
+ * Writes a list of {@link Buffer}s to this {@link PartitionedFile}. It
guarantees that after
+ * the return of this method, the target buffers can be released. In a
data region, all data of
+ * the same subpartition must be written together.
*
- * <p>Note: The caller is responsible for recycling the target buffer and
releasing the failed
+ * <p>Note: The caller is responsible for recycling the target buffers and
releasing the failed
* {@link PartitionedFile} if any exception occurs.
*/
- public void writeBuffer(Buffer target, int targetSubpartition) throws
IOException {
+ public void writeBuffers(List<SortBuffer.BufferWithChannel>
bufferWithChannels)
+ throws IOException {
checkState(!isFinished, "File writer is already finished.");
checkState(!isClosed, "File writer is already closed.");
- if (targetSubpartition != currentSubpartition) {
- checkState(
- subpartitionBuffers[targetSubpartition] == 0,
- "Must write data of the same channel together.");
- subpartitionOffsets[targetSubpartition] = totalBytesWritten;
- currentSubpartition = targetSubpartition;
+ if (bufferWithChannels.isEmpty()) {
+ return;
}
- totalBytesWritten += writeToByteChannel(dataFileChannel, target,
writeDataCache, header);
- ++subpartitionBuffers[targetSubpartition];
+ long expectedBytes = 0;
+ ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 *
bufferWithChannels.size()];
+
+ for (int i = 0; i < bufferWithChannels.size(); i++) {
+ SortBuffer.BufferWithChannel bufferWithChannel =
bufferWithChannels.get(i);
+ Buffer buffer = bufferWithChannel.getBuffer();
+ int subpartitionIndex = bufferWithChannel.getChannelIndex();
+ if (subpartitionIndex != currentSubpartition) {
+ checkState(
+ subpartitionBuffers[subpartitionIndex] == 0,
+ "Must write data of the same channel together.");
+ subpartitionOffsets[subpartitionIndex] = totalBytesWritten;
+ currentSubpartition = subpartitionIndex;
+ }
+
+ ByteBuffer header = BufferReaderWriterUtil.allocatedHeaderBuffer();
+ BufferReaderWriterUtil.getByteChannelBufferHeader(buffer, header);
+ bufferWithHeaders[2 * i] = header;
+ bufferWithHeaders[2 * i + 1] = buffer.getNioBufferReadable();
+
+ int numBytes = header.remaining() + buffer.readableBytes();
+ expectedBytes += numBytes;
+ totalBytesWritten += numBytes;
+ ++subpartitionBuffers[subpartitionIndex];
+ }
+
+ if (dataFileChannel.write(bufferWithHeaders) < expectedBytes) {
Review comment:
is it possible that the buffers are partially written into the file?
I'm also wondering whether this check is needed because we did not do it for
`FileChannel.writeBuffer(buffer)`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -212,31 +269,46 @@ private void flushCurrentSortBuffer() throws IOException {
if (currentSortBuffer.hasRemaining()) {
fileWriter.startNewRegion();
+ List<BufferWithChannel> toWrite = new ArrayList<>();
+ Queue<MemorySegment> segments = getWriteBuffers();
+
while (currentSortBuffer.hasRemaining()) {
- BufferWithChannel bufferWithChannel =
- currentSortBuffer.copyIntoSegment(writeBuffer);
- Buffer buffer = bufferWithChannel.getBuffer();
- int subpartitionIndex = bufferWithChannel.getChannelIndex();
+ if (segments.isEmpty()) {
+ fileWriter.writeBuffers(toWrite);
+ toWrite.clear();
+ segments = getWriteBuffers();
+ }
- writeCompressedBufferIfPossible(buffer, subpartitionIndex);
+ BufferWithChannel bufferWithChannel =
+
currentSortBuffer.copyIntoSegment(checkNotNull(segments.poll()));
+ toWrite.add(compressBufferIfPossible(bufferWithChannel));
}
+
+ fileWriter.writeBuffers(toWrite);
}
currentSortBuffer.release();
}
- private void writeCompressedBufferIfPossible(Buffer buffer, int
targetSubpartition)
- throws IOException {
- updateStatistics(buffer, targetSubpartition);
+ private Queue<MemorySegment> getWriteBuffers() {
+ synchronized (lock) {
+ checkState(!writeBuffers.isEmpty(), "Task has been canceled.");
+ return new ArrayDeque<>(writeBuffers);
+ }
+ }
- try {
- if (canBeCompressed(buffer)) {
- buffer = bufferCompressor.compressToIntermediateBuffer(buffer);
- }
- fileWriter.writeBuffer(buffer, targetSubpartition);
- } finally {
- buffer.recycleBuffer();
+ private BufferWithChannel compressBufferIfPossible(BufferWithChannel
bufferWithChannel) {
+ Buffer buffer = bufferWithChannel.getBuffer();
+ int channelIndex = bufferWithChannel.getChannelIndex();
+
+ updateStatistics(buffer, channelIndex);
+
+ if (!canBeCompressed(buffer)) {
+ return bufferWithChannel;
}
+
+ buffer =
checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
Review comment:
why do we need to change `compressToIntermediateBuffer` to
`compressToOriginalBuffer`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]