StephanEwen commented on a change in pull request #13595:
URL: https://github.com/apache/flink/pull/13595#discussion_r512634744



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * File writer which can write buffers and generate the {@link 
PartitionedFile}. Data is written region by region.
+ * Before writing any data, {@link #open} must be called and before writing a 
new region, {@link #startNewRegion}
+ * must be called. After writing all data, {@link #finish} must be called to 
close all opened files and return the
+ * target {@link PartitionedFile}.
+ */
+@NotThreadSafe
+public class PartitionedFileWriter implements AutoCloseable {
+
+       /** Used when writing data buffers. */
+       private final ByteBuffer[] header = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+
+       /** Buffer for writing region index. */
+       private final ByteBuffer indexBuffer;
+
+       /** Number of channels. When writing a buffer, target subpartition must 
be in this range. */
+       private final int numSubpartitions;
+
+       /** Data file path of the target {@link PartitionedFile}. */
+       private final Path dataFilePath;
+
+       /** Index file path of the target {@link PartitionedFile}. */
+       private final Path indexFilePath;
+
+       /** Number of bytes written for each subpartition in the current 
region. */
+       private final long[] subpartitionBytes;
+
+       /** Number of buffers written for each subpartition in the current 
region. */
+       private final int[] subpartitionBuffers;
+
+       /** Opened data file channel of the target {@link PartitionedFile}. */
+       private FileChannel dataFileChannel;
+
+       /** Opened index file channel of the target {@link PartitionedFile}. */
+       private FileChannel indexFileChannel;
+
+       /** Number of bytes written to the target {@link PartitionedFile}. */
+       private long totalBytesWritten;
+
+       /** Number of regions written to the target {@link PartitionedFile}. */
+       private int numRegions;
+
+       /** Current subpartition to write. Buffer writing must be in 
subpartition order within each region. */
+       private int currentSubpartition;
+
+       /** Whether all index data is cached in memory or not. */
+       private boolean allIndexDataCached = true;
+
+       /** Whether this file writer is finished. */
+       private boolean isFinished;
+
+       public PartitionedFileWriter(String basePath, int numSubpartitions, int 
indexBufferSize) {
+               checkArgument(basePath != null, "Base path must not be null.");
+               checkArgument(numSubpartitions > 0, "Illegal number of 
subpartitions.");
+               checkArgument(indexBufferSize > 0, "Illegal index buffer 
size.");
+
+               this.numSubpartitions = numSubpartitions;
+               this.subpartitionBytes = new long[numSubpartitions];
+               this.subpartitionBuffers = new int[numSubpartitions];
+               this.dataFilePath = new File(basePath + 
PartitionedFile.DATA_FILE_SUFFIX).toPath();
+               this.indexFilePath = new File(basePath + 
PartitionedFile.INDEX_FILE_SUFFIX).toPath();
+
+               this.indexBuffer = ByteBuffer.allocate(indexBufferSize * 
PartitionedFile.INDEX_ENTRY_SIZE);
+               indexBuffer.order(PartitionedFile.DEFAULT_BYTE_ORDER);
+       }
+
+       /**
+        * Opens the {@link PartitionedFile} for writing.
+        *
+        * <p>Note: The caller is responsible for releasing the failed {@link 
PartitionedFile} if any exception
+        * occurs.
+        */
+       public void open() throws IOException {
+               checkState(dataFileChannel == null && indexFileChannel == null, 
"Partitioned file is already opened.");
+
+               dataFileChannel = openFileChannel(dataFilePath);
+               indexFileChannel = openFileChannel(indexFilePath);
+       }
+
+       private FileChannel openFileChannel(Path path) throws IOException {
+               return FileChannel.open(path, StandardOpenOption.CREATE_NEW, 
StandardOpenOption.WRITE);
+       }
+
+       /**
+        * Persists the region index of the current data region and starts a 
new region to write.
+        *
+        * <p>Note: The caller is responsible for releasing the failed {@link 
PartitionedFile} if any exception
+        * occurs.
+        */
+       public void startNewRegion() throws IOException {
+               checkState(!isFinished, "File writer is already finished.");
+               checkState(dataFileChannel != null && indexFileChannel != null, 
"Must open the partitioned file first.");
+
+               writeRegionIndex();
+       }
+
+       private void writeRegionIndex() throws IOException {
+               if (Arrays.stream(subpartitionBytes).sum() > 0) {
+                       for (int subpartition = 0; subpartition < 
numSubpartitions; ++subpartition) {
+                               if (!indexBuffer.hasRemaining()) {
+                                       flushIndexBuffer();
+                                       indexBuffer.clear();
+                                       allIndexDataCached = false;
+                               }
+
+                               indexBuffer.putLong(totalBytesWritten);
+                               
indexBuffer.putInt(subpartitionBuffers[subpartition]);
+                               totalBytesWritten += 
subpartitionBytes[subpartition];
+                       }
+
+                       ++numRegions;
+                       currentSubpartition = 0;
+                       Arrays.fill(subpartitionBytes, 0);
+                       Arrays.fill(subpartitionBuffers, 0);
+               }
+       }
+
+       private void flushIndexBuffer() throws IOException {
+               indexBuffer.flip();
+               if (indexBuffer.limit() > 0) {
+                       BufferReaderWriterUtil.writeBuffer(indexFileChannel, 
indexBuffer);
+               }
+       }
+
+       /**
+        * Writes a {@link Buffer} of the given subpartition to the this {@link 
PartitionedFile}.
+        *
+        * <p>Note: The caller is responsible for recycling the target buffer 
and releasing the failed
+        * {@link PartitionedFile} if any exception occurs.
+        */
+       public void writeBuffer(Buffer target, int targetSubpartition) throws 
IOException {
+               checkArgument(targetSubpartition >= currentSubpartition, "Must 
write in subpartition index order.");
+               checkState(!isFinished, "File writer is already finished.");
+               checkState(dataFileChannel != null && indexFileChannel != null, 
"Must open the partitioned file first.");
+
+               currentSubpartition = Math.max(currentSubpartition, 
targetSubpartition);
+               long numBytes = 
BufferReaderWriterUtil.writeToByteChannel(dataFileChannel, target, header);
+
+               ++subpartitionBuffers[targetSubpartition];
+               subpartitionBytes[targetSubpartition] += numBytes;
+       }
+
+       /**
+        * Finishes writing which closes the file channel and returns the 
corresponding {@link PartitionedFile}.
+        *
+        * <p>Note: The caller is responsible for releasing the failed {@link 
PartitionedFile} if any exception
+        * occurs.
+        */
+       public PartitionedFile finish() throws IOException {
+               checkState(!isFinished, "File writer is already finished.");
+               checkState(dataFileChannel != null && indexFileChannel != null, 
"Must open the partitioned file first.");
+
+               isFinished = true;
+
+               writeRegionIndex();
+               flushIndexBuffer();
+               indexBuffer.rewind();
+
+               close();
+               ByteBuffer indexDataCache = allIndexDataCached ? indexBuffer : 
null;
+               return new PartitionedFile(numRegions, numSubpartitions, 
dataFilePath, indexFilePath, indexDataCache);
+       }
+
+       /**
+        * Used to close and delete the failed {@link PartitionedFile} when any 
exception occurs.
+        */
+       public void releaseQuietly() {
+               close();
+               IOUtils.deleteFileQuietly(dataFilePath);
+               IOUtils.deleteFileQuietly(indexFilePath);
+       }
+
+       @Override
+       public void close() {
+               Throwable exception = null;
+
+               try {
+                       dataFileChannel.close();
+               } catch (Throwable throwable) {
+                       exception = throwable;
+               }
+
+               try {
+                       dataFileChannel.close();
+               } catch (Throwable throwable) {
+                       exception = throwable;

Review comment:
       This way, you'll lose the first exception. I would either keep the first 
exception or use the `ExceptionUtils.firstOrSuppressed()` pattern here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to