sunhaibotb commented on a change in pull request #7797: [FLINK-11379] Fix
OutOfMemoryError caused by Files.readAllBytes() when TM loads a large size TDD
URL: https://github.com/apache/flink/pull/7797#discussion_r259791054
##########
File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
##########
@@ -107,6 +120,92 @@ public static void writeFileUtf8(File file, String
contents) throws IOException
writeFile(file, contents, "UTF-8");
}
+ /**
+ * Reads all the bytes from a file. The method ensures that the file is
+ * closed when all bytes have been read or an I/O error, or other
runtime
+ * exception, is thrown.
+ *
+ * <p>This is an implementation that follow {@link
java.nio.file.Files#readAllBytes(java.nio.file.Path)},
+ * and the difference is that it limits the size of the direct buffer
to avoid
+ * direct-buffer OutOfMemoryError. When {@link
java.nio.file.Files#readAllBytes(java.nio.file.Path)}
+ * or other interfaces in java API can do this in the future, we should
remove it.
+ *
+ * @param path
+ * the path to the file
+ * @return a byte array containing the bytes read from the file
+ *
+ * @throws IOException
+ * if an I/O error occurs reading from the stream
+ * @throws OutOfMemoryError
+ * if an array of the required size cannot be allocated, for
+ * example the file is larger that {@code 2GB}
+ */
+ public static byte[] readAllBytes(java.nio.file.Path path) throws
IOException {
+ try (SeekableByteChannel channel = Files.newByteChannel(path);
+ InputStream in = Channels.newInputStream(channel)) {
+
+ long size = channel.size();
+ if (size > (long) MAX_BUFFER_SIZE) {
+ throw new OutOfMemoryError("Required array size
too large");
+ }
+
+ return read(in, (int) size, BUFFER_SIZE);
+ }
+ }
+
+ /**
+ * Reads all the bytes from an input stream. Uses {@code initialSize}
as a hint
+ * about how many bytes the stream will have and uses {@code
directBufferSize}
+ * to limit the size of the direct buffer used to read.
+ *
+ * @param source
+ * the input stream to read from
+ * @param initialSize
+ * the initial size of the byte array to allocate
+ * @param maxDirectBufferSize
+ * the maximum size of the direct buffer used to read
+ * @return a byte array containing the bytes read from the file
+ *
+ * @throws IOException
+ * if an I/O error occurs reading from the stream
+ * @throws OutOfMemoryError
+ * if an array of the required size cannot be allocated
+ */
+ private static byte[] read(InputStream source, int initialSize, int
maxDirectBufferSize) throws IOException {
Review comment:
The purpose of adding the maxDirectBufferSize parameter is extensible for
the case with different size of direct buffer in the future. For now, it can be
deleted. What do you propose?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services