This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new c2bc493  [FLINK-11379][core] Fix OutOfMemoryError caused by 
Files.readAllBytes() when TM loads a large size TDD
c2bc493 is described below

commit c2bc493cb38258281a98a80848370b3a5b5c01e8
Author: sunhaibotb <sunhaib...@163.com>
AuthorDate: Thu Feb 21 17:23:53 2019 +0800

    [FLINK-11379][core] Fix OutOfMemoryError caused by Files.readAllBytes() 
when TM loads a large size TDD
    
    This closes #7797
---
 .../main/java/org/apache/flink/util/FileUtils.java | 98 +++++++++++++++++++++-
 .../java/org/apache/flink/util/FileUtilsTest.java  | 85 +++++++++++++++++++
 .../deployment/TaskDeploymentDescriptor.java       |  6 +-
 3 files changed, 185 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 8f32262..c4d6df7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -28,11 +28,15 @@ import org.apache.flink.util.function.ThrowingConsumer;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
@@ -56,6 +60,15 @@ public final class FileUtils {
        /** The length of the random part of the filename. */
        private static final int RANDOM_FILE_NAME_LENGTH = 12;
 
+       /**
+        * The maximum size of array to allocate for reading. See
+        * {@link java.nio.file.Files#MAX_BUFFER_SIZE} for more.
+        */
+       private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
+
+       /** The size of the buffer used for reading. */
+       private static final int BUFFER_SIZE = 4096;
+
        // 
------------------------------------------------------------------------
 
        public static void writeCompletely(WritableByteChannel channel, 
ByteBuffer src) throws IOException {
@@ -90,7 +103,7 @@ public final class FileUtils {
        // 
------------------------------------------------------------------------
 
        public static String readFile(File file, String charsetName) throws 
IOException {
-               byte[] bytes = Files.readAllBytes(file.toPath());
+               byte[] bytes = readAllBytes(file.toPath());
                return new String(bytes, charsetName);
        }
 
@@ -107,6 +120,89 @@ public final class FileUtils {
                writeFile(file, contents, "UTF-8");
        }
 
+       /**
+        * Reads all the bytes from a file. The method ensures that the file is
+        * closed when all bytes have been read or an I/O error, or other 
runtime
+        * exception, is thrown.
+        *
+        * <p>This is an implementation that follow {@link 
java.nio.file.Files#readAllBytes(java.nio.file.Path)},
+        * and the difference is that it limits the size of the direct buffer 
to avoid
+        * direct-buffer OutOfMemoryError. When {@link 
java.nio.file.Files#readAllBytes(java.nio.file.Path)}
+        * or other interfaces in java API can do this in the future, we should 
remove it.
+        *
+        * @param path
+        *        the path to the file
+        * @return a byte array containing the bytes read from the file
+        *
+        * @throws IOException
+        *         if an I/O error occurs reading from the stream
+        * @throws OutOfMemoryError
+        *         if an array of the required size cannot be allocated, for
+        *         example the file is larger that {@code 2GB}
+        */
+       public static byte[] readAllBytes(java.nio.file.Path path) throws 
IOException {
+               try (SeekableByteChannel channel = Files.newByteChannel(path);
+                       InputStream in = Channels.newInputStream(channel)) {
+
+                       long size = channel.size();
+                       if (size > (long) MAX_BUFFER_SIZE) {
+                               throw new OutOfMemoryError("Required array size 
too large");
+                       }
+
+                       return read(in, (int) size);
+               }
+       }
+
+       /**
+        * Reads all the bytes from an input stream. Uses {@code initialSize} 
as a hint
+        * about how many bytes the stream will have and uses {@code 
directBufferSize}
+        * to limit the size of the direct buffer used to read.
+        *
+        * @param source
+        *        the input stream to read from
+        * @param initialSize
+        *        the initial size of the byte array to allocate
+        * @return a byte array containing the bytes read from the file
+        *
+        * @throws IOException
+        *         if an I/O error occurs reading from the stream
+        * @throws OutOfMemoryError
+        *         if an array of the required size cannot be allocated
+        */
+       private static byte[] read(InputStream source, int initialSize) throws 
IOException {
+               int capacity = initialSize;
+               byte[] buf = new byte[capacity];
+               int nread = 0;
+               int n;
+
+               for (; ;) {
+                       // read to EOF which may read more or less than 
initialSize (eg: file
+                       // is truncated while we are reading)
+                       while ((n = source.read(buf, nread, Math.min(capacity - 
nread, BUFFER_SIZE))) > 0) {
+                               nread += n;
+                       }
+
+                       // if last call to source.read() returned -1, we are 
done
+                       // otherwise, try to read one more byte; if that failed 
we're done too
+                       if (n < 0 || (n = source.read()) < 0) {
+                               break;
+                       }
+
+                       // one more byte was read; need to allocate a larger 
buffer
+                       if (capacity <= MAX_BUFFER_SIZE - capacity) {
+                               capacity = Math.max(capacity << 1, BUFFER_SIZE);
+                       } else {
+                               if (capacity == MAX_BUFFER_SIZE) {
+                                       throw new OutOfMemoryError("Required 
array size too large");
+                               }
+                               capacity = MAX_BUFFER_SIZE;
+                       }
+                       buf = Arrays.copyOf(buf, capacity);
+                       buf[nread++] = (byte) n;
+               }
+               return (capacity == nread) ? buf : Arrays.copyOf(buf, nread);
+       }
+
        // 
------------------------------------------------------------------------
        //  Deleting directories on standard File Systems
        // 
------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 76b7805..c3cd7ba 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.util;
 
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CheckedThread;
@@ -31,12 +32,16 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -60,6 +65,41 @@ public class FileUtilsTest extends TestLogger {
        // 
------------------------------------------------------------------------
 
        @Test
+       public void testReadAllBytes() throws Exception {
+               TemporaryFolder tmpFolder = null;
+               try {
+                       tmpFolder = new TemporaryFolder(new 
File(this.getClass().getResource("/").getPath()));
+                       tmpFolder.create();
+
+                       final int fileSize = 1024;
+                       final String testFilePath = 
tmpFolder.getRoot().getAbsolutePath() + File.separator
+                               + this.getClass().getSimpleName() + "_" + 
fileSize + ".txt";
+
+                       {
+                               String expectedMD5 = 
generateTestFile(testFilePath, 1024);
+                               final byte[] data = FileUtils.readAllBytes((new 
File(testFilePath)).toPath());
+                               assertEquals(expectedMD5, md5Hex(data));
+                       }
+
+                       {
+                               String expectedMD5 = 
generateTestFile(testFilePath, 4096);
+                               final byte[] data = FileUtils.readAllBytes((new 
File(testFilePath)).toPath());
+                               assertEquals(expectedMD5, md5Hex(data));
+                       }
+
+                       {
+                               String expectedMD5 = 
generateTestFile(testFilePath, 5120);
+                               final byte[] data = FileUtils.readAllBytes((new 
File(testFilePath)).toPath());
+                               assertEquals(expectedMD5, md5Hex(data));
+                       }
+               } finally {
+                       if (tmpFolder != null) {
+                               tmpFolder.delete();
+                       }
+               }
+       }
+
+       @Test
        public void testDeletePathIfEmpty() throws IOException {
                final FileSystem localFs = FileSystem.getLocalFileSystem();
 
@@ -272,6 +312,51 @@ public class FileUtilsTest extends TestLogger {
                }
        }
 
+       /**
+        * Generates a random content file.
+        *
+        * @param outputFile the path of the output file
+        * @param length the size of content to generate
+        *
+        * @return MD5 of the output file
+        *
+        * @throws IOException
+        * @throws NoSuchAlgorithmException
+        */
+       private static String generateTestFile(String outputFile, int length) 
throws IOException, NoSuchAlgorithmException {
+               Path outputFilePath = new Path(outputFile);
+
+               final FileSystem fileSystem = outputFilePath.getFileSystem();
+               try (final FSDataOutputStream fsDataOutputStream = 
fileSystem.create(outputFilePath, FileSystem.WriteMode.OVERWRITE)) {
+                       return writeRandomContent(fsDataOutputStream, length);
+               }
+       }
+
+       private static String writeRandomContent(OutputStream out, int length) 
throws IOException, NoSuchAlgorithmException {
+               MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+
+               Random random = new Random();
+               char startChar = 32, endChar = 127;
+               for (int i = 0; i < length; i++) {
+                       int rnd = random.nextInt(endChar - startChar);
+                       byte b = (byte) (startChar + rnd);
+
+                       out.write(b);
+                       messageDigest.update(b);
+               }
+
+               byte[] b = messageDigest.digest();
+               return org.apache.flink.util.StringUtils.byteToHexString(b);
+       }
+
+       private static String md5Hex(byte[] data) throws 
NoSuchAlgorithmException {
+               MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+               messageDigest.update(data);
+
+               byte[] b = messageDigest.digest();
+               return org.apache.flink.util.StringUtils.byteToHexString(b);
+       }
+
        // 
------------------------------------------------------------------------
 
        private static class Deleter extends CheckedThread {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index bb038eb..11afe50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
@@ -34,7 +35,6 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.file.Files;
 import java.util.Collection;
 
 /**
@@ -300,7 +300,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        //       (it is deleted automatically on the BLOB 
server and cache when the job
                        //       enters a terminal state)
                        SerializedValue<JobInformation> serializedValue =
-                               
SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath()));
+                               
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
                        serializedJobInformation = new 
NonOffloaded<>(serializedValue);
                }
 
@@ -315,7 +315,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        //       (it is deleted automatically on the BLOB 
server and cache when the job
                        //       enters a terminal state)
                        SerializedValue<TaskInformation> serializedValue =
-                               
SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath()));
+                               
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
                        serializedTaskInformation = new 
NonOffloaded<>(serializedValue);
                }
 

Reply via email to