This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new c2bc493 [FLINK-11379][core] Fix OutOfMemoryError caused by
Files.readAllBytes() when TM loads a large size TDD
c2bc493 is described below
commit c2bc493cb38258281a98a80848370b3a5b5c01e8
Author: sunhaibotb <[email protected]>
AuthorDate: Thu Feb 21 17:23:53 2019 +0800
[FLINK-11379][core] Fix OutOfMemoryError caused by Files.readAllBytes()
when TM loads a large size TDD
This closes #7797
---
.../main/java/org/apache/flink/util/FileUtils.java | 98 +++++++++++++++++++++-
.../java/org/apache/flink/util/FileUtilsTest.java | 85 +++++++++++++++++++
.../deployment/TaskDeploymentDescriptor.java | 6 +-
3 files changed, 185 insertions(+), 4 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 8f32262..c4d6df7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -28,11 +28,15 @@ import org.apache.flink.util.function.ThrowingConsumer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
import java.util.Random;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@@ -56,6 +60,15 @@ public final class FileUtils {
/** The length of the random part of the filename. */
private static final int RANDOM_FILE_NAME_LENGTH = 12;
+ /**
+ * The maximum size of array to allocate for reading. See
+ * {@link java.nio.file.Files#MAX_BUFFER_SIZE} for more.
+ */
+ private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
+
+ /** The size of the buffer used for reading. */
+ private static final int BUFFER_SIZE = 4096;
+
//
------------------------------------------------------------------------
public static void writeCompletely(WritableByteChannel channel,
ByteBuffer src) throws IOException {
@@ -90,7 +103,7 @@ public final class FileUtils {
//
------------------------------------------------------------------------
public static String readFile(File file, String charsetName) throws
IOException {
- byte[] bytes = Files.readAllBytes(file.toPath());
+ byte[] bytes = readAllBytes(file.toPath());
return new String(bytes, charsetName);
}
@@ -107,6 +120,89 @@ public final class FileUtils {
writeFile(file, contents, "UTF-8");
}
+ /**
+ * Reads all the bytes from a file. The method ensures that the file is
+ * closed when all bytes have been read or an I/O error, or other
runtime
+ * exception, is thrown.
+ *
+ * <p>This is an implementation that follow {@link
java.nio.file.Files#readAllBytes(java.nio.file.Path)},
+ * and the difference is that it limits the size of the direct buffer
to avoid
+ * direct-buffer OutOfMemoryError. When {@link
java.nio.file.Files#readAllBytes(java.nio.file.Path)}
+ * or other interfaces in java API can do this in the future, we should
remove it.
+ *
+ * @param path
+ * the path to the file
+ * @return a byte array containing the bytes read from the file
+ *
+ * @throws IOException
+ * if an I/O error occurs reading from the stream
+ * @throws OutOfMemoryError
+ * if an array of the required size cannot be allocated, for
+ * example the file is larger that {@code 2GB}
+ */
+ public static byte[] readAllBytes(java.nio.file.Path path) throws
IOException {
+ try (SeekableByteChannel channel = Files.newByteChannel(path);
+ InputStream in = Channels.newInputStream(channel)) {
+
+ long size = channel.size();
+ if (size > (long) MAX_BUFFER_SIZE) {
+ throw new OutOfMemoryError("Required array size
too large");
+ }
+
+ return read(in, (int) size);
+ }
+ }
+
+ /**
+ * Reads all the bytes from an input stream. Uses {@code initialSize}
as a hint
+ * about how many bytes the stream will have and uses {@code
directBufferSize}
+ * to limit the size of the direct buffer used to read.
+ *
+ * @param source
+ * the input stream to read from
+ * @param initialSize
+ * the initial size of the byte array to allocate
+ * @return a byte array containing the bytes read from the file
+ *
+ * @throws IOException
+ * if an I/O error occurs reading from the stream
+ * @throws OutOfMemoryError
+ * if an array of the required size cannot be allocated
+ */
+ private static byte[] read(InputStream source, int initialSize) throws
IOException {
+ int capacity = initialSize;
+ byte[] buf = new byte[capacity];
+ int nread = 0;
+ int n;
+
+ for (; ;) {
+ // read to EOF which may read more or less than
initialSize (eg: file
+ // is truncated while we are reading)
+ while ((n = source.read(buf, nread, Math.min(capacity -
nread, BUFFER_SIZE))) > 0) {
+ nread += n;
+ }
+
+ // if last call to source.read() returned -1, we are
done
+ // otherwise, try to read one more byte; if that failed
we're done too
+ if (n < 0 || (n = source.read()) < 0) {
+ break;
+ }
+
+ // one more byte was read; need to allocate a larger
buffer
+ if (capacity <= MAX_BUFFER_SIZE - capacity) {
+ capacity = Math.max(capacity << 1, BUFFER_SIZE);
+ } else {
+ if (capacity == MAX_BUFFER_SIZE) {
+ throw new OutOfMemoryError("Required
array size too large");
+ }
+ capacity = MAX_BUFFER_SIZE;
+ }
+ buf = Arrays.copyOf(buf, capacity);
+ buf[nread++] = (byte) n;
+ }
+ return (capacity == nread) ? buf : Arrays.copyOf(buf, nread);
+ }
+
//
------------------------------------------------------------------------
// Deleting directories on standard File Systems
//
------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 76b7805..c3cd7ba 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.util;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CheckedThread;
@@ -31,12 +32,16 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Comparator;
import java.util.List;
+import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -60,6 +65,41 @@ public class FileUtilsTest extends TestLogger {
//
------------------------------------------------------------------------
@Test
+ public void testReadAllBytes() throws Exception {
+ TemporaryFolder tmpFolder = null;
+ try {
+ tmpFolder = new TemporaryFolder(new
File(this.getClass().getResource("/").getPath()));
+ tmpFolder.create();
+
+ final int fileSize = 1024;
+ final String testFilePath =
tmpFolder.getRoot().getAbsolutePath() + File.separator
+ + this.getClass().getSimpleName() + "_" +
fileSize + ".txt";
+
+ {
+ String expectedMD5 =
generateTestFile(testFilePath, 1024);
+ final byte[] data = FileUtils.readAllBytes((new
File(testFilePath)).toPath());
+ assertEquals(expectedMD5, md5Hex(data));
+ }
+
+ {
+ String expectedMD5 =
generateTestFile(testFilePath, 4096);
+ final byte[] data = FileUtils.readAllBytes((new
File(testFilePath)).toPath());
+ assertEquals(expectedMD5, md5Hex(data));
+ }
+
+ {
+ String expectedMD5 =
generateTestFile(testFilePath, 5120);
+ final byte[] data = FileUtils.readAllBytes((new
File(testFilePath)).toPath());
+ assertEquals(expectedMD5, md5Hex(data));
+ }
+ } finally {
+ if (tmpFolder != null) {
+ tmpFolder.delete();
+ }
+ }
+ }
+
+ @Test
public void testDeletePathIfEmpty() throws IOException {
final FileSystem localFs = FileSystem.getLocalFileSystem();
@@ -272,6 +312,51 @@ public class FileUtilsTest extends TestLogger {
}
}
+ /**
+ * Generates a random content file.
+ *
+ * @param outputFile the path of the output file
+ * @param length the size of content to generate
+ *
+ * @return MD5 of the output file
+ *
+ * @throws IOException
+ * @throws NoSuchAlgorithmException
+ */
+ private static String generateTestFile(String outputFile, int length)
throws IOException, NoSuchAlgorithmException {
+ Path outputFilePath = new Path(outputFile);
+
+ final FileSystem fileSystem = outputFilePath.getFileSystem();
+ try (final FSDataOutputStream fsDataOutputStream =
fileSystem.create(outputFilePath, FileSystem.WriteMode.OVERWRITE)) {
+ return writeRandomContent(fsDataOutputStream, length);
+ }
+ }
+
+ private static String writeRandomContent(OutputStream out, int length)
throws IOException, NoSuchAlgorithmException {
+ MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+
+ Random random = new Random();
+ char startChar = 32, endChar = 127;
+ for (int i = 0; i < length; i++) {
+ int rnd = random.nextInt(endChar - startChar);
+ byte b = (byte) (startChar + rnd);
+
+ out.write(b);
+ messageDigest.update(b);
+ }
+
+ byte[] b = messageDigest.digest();
+ return org.apache.flink.util.StringUtils.byteToHexString(b);
+ }
+
+ private static String md5Hex(byte[] data) throws
NoSuchAlgorithmException {
+ MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+ messageDigest.update(data);
+
+ byte[] b = messageDigest.digest();
+ return org.apache.flink.util.StringUtils.byteToHexString(b);
+ }
+
//
------------------------------------------------------------------------
private static class Deleter extends CheckedThread {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index bb038eb..11afe50 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -26,6 +26,7 @@ import
org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
@@ -34,7 +35,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.file.Files;
import java.util.Collection;
/**
@@ -300,7 +300,7 @@ public final class TaskDeploymentDescriptor implements
Serializable {
// (it is deleted automatically on the BLOB
server and cache when the job
// enters a terminal state)
SerializedValue<JobInformation> serializedValue =
-
SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath()));
+
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
serializedJobInformation = new
NonOffloaded<>(serializedValue);
}
@@ -315,7 +315,7 @@ public final class TaskDeploymentDescriptor implements
Serializable {
// (it is deleted automatically on the BLOB
server and cache when the job
// enters a terminal state)
SerializedValue<TaskInformation> serializedValue =
-
SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath()));
+
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
serializedTaskInformation = new
NonOffloaded<>(serializedValue);
}