huwh commented on code in PR #23599:
URL: https://github.com/apache/flink/pull/23599#discussion_r1384389582
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java:
##########
@@ -253,13 +274,19 @@ public void loadBigData(
Preconditions.checkNotNull(blobService);
- final File dataFile = blobService.getFile(jobId, jobInfoKey);
- // NOTE: Do not delete the job info BLOB since it may be needed
again during recovery.
- // (it is deleted automatically on the BLOB server and cache
when the job
- // enters a terminal state)
- SerializedValue<JobInformation> serializedValue =
-
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
- serializedJobInformation = new NonOffloaded<>(serializedValue);
+ JobInformation jobInformation = jobInformationCache.get(jobId,
jobInfoKey);
+ if (jobInformation == null) {
+ final File dataFile = blobService.getFile(jobId, jobInfoKey);
+ // NOTE: Do not delete the job info BLOB since it may be
needed again during
+ // recovery. (it is deleted automatically on the BLOB server
and cache when the job
+ // enters a terminal state)
+ jobInformation =
+ InstantiationUtil.deserializeObject(
+ new
BufferedInputStream(Files.newInputStream(dataFile.toPath())),
+ getClass().getClassLoader());
+ jobInformationCache.put(jobId, jobInfoKey, jobInformation);
+ }
+ this.jobInformation = jobInformation.deepCopy();
Review Comment:
Thanks for your explanation. Let's keep the `deepCopy`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]