wsry commented on a change in pull request #15192:
URL: https://github.com/apache/flink/pull/15192#discussion_r595705324
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
##########
@@ -202,26 +190,51 @@ private void flushIndexBuffer() throws IOException {
}
/**
- * Writes a {@link Buffer} of the given subpartition to the this {@link
PartitionedFile}. In a
- * data region, all data of the same subpartition must be written together.
+ * Writes a list of {@link Buffer}s to this {@link PartitionedFile}. It
guarantees that after
+ * the return of this method, the target buffers can be released. In a
data region, all data of
+ * the same subpartition must be written together.
*
- * <p>Note: The caller is responsible for recycling the target buffer and
releasing the failed
+ * <p>Note: The caller is responsible for recycling the target buffers and
releasing the failed
* {@link PartitionedFile} if any exception occurs.
*/
- public void writeBuffer(Buffer target, int targetSubpartition) throws
IOException {
+ public void writeBuffers(List<SortBuffer.BufferWithChannel>
bufferWithChannels)
+ throws IOException {
checkState(!isFinished, "File writer is already finished.");
checkState(!isClosed, "File writer is already closed.");
- if (targetSubpartition != currentSubpartition) {
- checkState(
- subpartitionBuffers[targetSubpartition] == 0,
- "Must write data of the same channel together.");
- subpartitionOffsets[targetSubpartition] = totalBytesWritten;
- currentSubpartition = targetSubpartition;
+ if (bufferWithChannels.isEmpty()) {
+ return;
}
- totalBytesWritten += writeToByteChannel(dataFileChannel, target,
writeDataCache, header);
- ++subpartitionBuffers[targetSubpartition];
+ long expectedBytes = 0;
+ ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 *
bufferWithChannels.size()];
+
+ for (int i = 0; i < bufferWithChannels.size(); i++) {
+ SortBuffer.BufferWithChannel bufferWithChannel =
bufferWithChannels.get(i);
+ Buffer buffer = bufferWithChannel.getBuffer();
+ int subpartitionIndex = bufferWithChannel.getChannelIndex();
+ if (subpartitionIndex != currentSubpartition) {
+ checkState(
+ subpartitionBuffers[subpartitionIndex] == 0,
+ "Must write data of the same channel together.");
+ subpartitionOffsets[subpartitionIndex] = totalBytesWritten;
+ currentSubpartition = subpartitionIndex;
+ }
+
+ ByteBuffer header = BufferReaderWriterUtil.allocatedHeaderBuffer();
+ BufferReaderWriterUtil.getByteChannelBufferHeader(buffer, header);
+ bufferWithHeaders[2 * i] = header;
+ bufferWithHeaders[2 * i + 1] = buffer.getNioBufferReadable();
+
+ int numBytes = header.remaining() + buffer.readableBytes();
+ expectedBytes += numBytes;
+ totalBytesWritten += numBytes;
+ ++subpartitionBuffers[subpartitionIndex];
+ }
+
+ if (dataFileChannel.write(bufferWithHeaders) < expectedBytes) {
Review comment:
Document of FileChannel.write does not say it guarantees to write all
data out. BufferReaderWriterUtil already does the same thing and there is some
comments explaining why we do that. I can extract the logic and the
corresponding comments in BufferReaderWriterUtil to a method and directly call
that method here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]