This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b759794e33d68b557f4ee2d7287f5fae9a80167e Author: Rui Fan <[email protected]> AuthorDate: Fri Oct 27 15:34:58 2023 +0800 [FLINK-33354][runtime] Using the InputStream instead of byte array to avoid contiguous huge memory usage --- .../apache/flink/runtime/deployment/TaskDeploymentDescriptor.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index e92162fb14e..5684066735f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -28,16 +28,17 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.util.GroupCache; -import org.apache.flink.util.FileUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; +import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.nio.file.Files; import java.util.List; /** @@ -281,7 +282,7 @@ public final class TaskDeploymentDescriptor implements Serializable { // enters a terminal state) jobInformation = InstantiationUtil.deserializeObject( - FileUtils.readAllBytes(dataFile.toPath()), + new BufferedInputStream(Files.newInputStream(dataFile.toPath())), getClass().getClassLoader()); jobInformationCache.put(jobId, jobInfoKey, jobInformation); } @@ -303,7 +304,7 @@ public final class TaskDeploymentDescriptor implements Serializable { // enters a terminal state) taskInformation = InstantiationUtil.deserializeObject( - FileUtils.readAllBytes(dataFile.toPath()), + new BufferedInputStream(Files.newInputStream(dataFile.toPath())), getClass().getClassLoader()); taskInformationCache.put(jobId, taskInfoKey, taskInformation); }
