huwh commented on code in PR #23599: URL: https://github.com/apache/flink/pull/23599#discussion_r1382863857
########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java: ########## @@ -253,13 +274,19 @@ public void loadBigData( Preconditions.checkNotNull(blobService); - final File dataFile = blobService.getFile(jobId, jobInfoKey); - // NOTE: Do not delete the job info BLOB since it may be needed again during recovery. - // (it is deleted automatically on the BLOB server and cache when the job - // enters a terminal state) - SerializedValue<JobInformation> serializedValue = - SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); - serializedJobInformation = new NonOffloaded<>(serializedValue); + JobInformation jobInformation = jobInformationCache.get(jobId, jobInfoKey); + if (jobInformation == null) { + final File dataFile = blobService.getFile(jobId, jobInfoKey); + // NOTE: Do not delete the job info BLOB since it may be needed again during + // recovery. (it is deleted automatically on the BLOB server and cache when the job + // enters a terminal state) + jobInformation = + InstantiationUtil.deserializeObject( + new BufferedInputStream(Files.newInputStream(dataFile.toPath())), + getClass().getClassLoader()); + jobInformationCache.put(jobId, jobInfoKey, jobInformation); + } + this.jobInformation = jobInformation.deepCopy(); Review Comment: Can we use `this.jobInformation = jobInformation` here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org