This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b759794e33d68b557f4ee2d7287f5fae9a80167e
Author: Rui Fan <[email protected]>
AuthorDate: Fri Oct 27 15:34:58 2023 +0800

    [FLINK-33354][runtime] Using the InputStream instead of byte array to avoid 
contiguous huge memory usage
---
 .../apache/flink/runtime/deployment/TaskDeploymentDescriptor.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index e92162fb14e..5684066735f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -28,16 +28,17 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.util.GroupCache;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
+import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.file.Files;
 import java.util.List;
 
 /**
@@ -281,7 +282,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                 // enters a terminal state)
                 jobInformation =
                         InstantiationUtil.deserializeObject(
-                                FileUtils.readAllBytes(dataFile.toPath()),
+                                new 
BufferedInputStream(Files.newInputStream(dataFile.toPath())),
                                 getClass().getClassLoader());
                 jobInformationCache.put(jobId, jobInfoKey, jobInformation);
             }
@@ -303,7 +304,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                 // enters a terminal state)
                 taskInformation =
                         InstantiationUtil.deserializeObject(
-                                FileUtils.readAllBytes(dataFile.toPath()),
+                                new 
BufferedInputStream(Files.newInputStream(dataFile.toPath())),
                                 getClass().getClassLoader());
                 taskInformationCache.put(jobId, taskInfoKey, taskInformation);
             }

Reply via email to