This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1786ba952bb81e8180cac9c5c95c2fc46e44fb7
Author: 1996fanrui <[email protected]>
AuthorDate: Thu Oct 26 15:35:15 2023 +0800

    [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid 
deserializing duplicate big objects
---
 .../deployment/TaskDeploymentDescriptor.java       |  95 +++++++++-----
 .../runtime/executiongraph/JobInformation.java     |  59 +++++++--
 .../runtime/executiongraph/TaskInformation.java    |  40 ++++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  21 ++--
 .../runtime/taskexecutor/TaskManagerServices.java  |  23 ++++
 .../deployment/TaskDeploymentDescriptorTest.java   | 139 ++++++++++++++++-----
 .../flink/runtime/dispatcher/JobMasterTester.java  |   4 +-
 .../DefaultExecutionGraphDeploymentTest.java       |  12 +-
 ...hDeploymentWithSmallBlobCacheSizeLimitTest.java |   6 +-
 .../taskexecutor/TaskManagerServicesBuilder.java   |   2 +
 10 files changed, 315 insertions(+), 86 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 97596e1ac9f..e92162fb14e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.util.GroupCache;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
@@ -99,10 +100,22 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
     }
 
     /** Serialized job information if non-offloaded or 
<tt>PermanentBlobKey</tt> if offloaded. */
-    private MaybeOffloaded<JobInformation> serializedJobInformation;
+    private final MaybeOffloaded<JobInformation> serializedJobInformation;
 
     /** Serialized task information if non-offloaded or 
<tt>PermanentBlobKey</tt> if offloaded. */
-    private MaybeOffloaded<TaskInformation> serializedTaskInformation;
+    private final MaybeOffloaded<TaskInformation> serializedTaskInformation;
+
+    /**
+     * The job information, it isn't null when serializedJobInformation is 
offloaded and after
+     * {@link #loadBigData}.
+     */
+    private transient JobInformation jobInformation;
+
+    /**
+     * The task information, it isn't null when serializedTaskInformation is 
offloaded and after
+     * {@link #loadBigData}.
+     */
+    private transient TaskInformation taskInformation;
 
     /**
      * The ID referencing the job this task belongs to.
@@ -152,39 +165,43 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
     }
 
     /**
-     * Return the sub task's serialized job information.
+     * Return the sub task's job information.
      *
-     * @return serialized job information (may throw {@link 
IllegalStateException} if {@link
-     *     #loadBigData} is not called beforehand).
+     * @return job information (may throw {@link IllegalStateException} if 
{@link #loadBigData} is
+     *     not called beforehand).
      * @throws IllegalStateException If job information is offloaded to BLOB 
store.
      */
-    public SerializedValue<JobInformation> getSerializedJobInformation() {
+    public JobInformation getJobInformation() throws IOException, 
ClassNotFoundException {
+        if (jobInformation != null) {
+            return jobInformation;
+        }
         if (serializedJobInformation instanceof NonOffloaded) {
             NonOffloaded<JobInformation> jobInformation =
                     (NonOffloaded<JobInformation>) serializedJobInformation;
-            return jobInformation.serializedValue;
-        } else {
-            throw new IllegalStateException(
-                    "Trying to work with offloaded serialized job 
information.");
+            return 
jobInformation.serializedValue.deserializeValue(getClass().getClassLoader());
         }
+        throw new IllegalStateException(
+                "Trying to work with offloaded serialized job information.");
     }
 
     /**
-     * Return the sub task's serialized task information.
+     * Return the sub task's task information.
      *
-     * @return serialized task information (may throw {@link 
IllegalStateException} if {@link
-     *     #loadBigData} is not called beforehand)).
+     * @return task information (may throw {@link IllegalStateException} if 
{@link #loadBigData} is
+     *     not called beforehand)).
      * @throws IllegalStateException If job information is offloaded to BLOB 
store.
      */
-    public SerializedValue<TaskInformation> getSerializedTaskInformation() {
+    public TaskInformation getTaskInformation() throws IOException, 
ClassNotFoundException {
+        if (taskInformation != null) {
+            return taskInformation;
+        }
         if (serializedTaskInformation instanceof NonOffloaded) {
             NonOffloaded<TaskInformation> taskInformation =
                     (NonOffloaded<TaskInformation>) serializedTaskInformation;
-            return taskInformation.serializedValue;
-        } else {
-            throw new IllegalStateException(
-                    "Trying to work with offloaded serialized job 
information.");
+            return 
taskInformation.serializedValue.deserializeValue(getClass().getClassLoader());
         }
+        throw new IllegalStateException(
+                "Trying to work with offloaded serialized task information.");
     }
 
     /**
@@ -243,6 +260,8 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
      */
     public void loadBigData(
             @Nullable PermanentBlobService blobService,
+            GroupCache<JobID, PermanentBlobKey, JobInformation> 
jobInformationCache,
+            GroupCache<JobID, PermanentBlobKey, TaskInformation> 
taskInformationCache,
             GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache)
             throws IOException, ClassNotFoundException {
 
@@ -254,13 +273,19 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
 
             Preconditions.checkNotNull(blobService);
 
-            final File dataFile = blobService.getFile(jobId, jobInfoKey);
-            // NOTE: Do not delete the job info BLOB since it may be needed 
again during recovery.
-            //       (it is deleted automatically on the BLOB server and cache 
when the job
-            //       enters a terminal state)
-            SerializedValue<JobInformation> serializedValue =
-                    
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
-            serializedJobInformation = new NonOffloaded<>(serializedValue);
+            JobInformation jobInformation = jobInformationCache.get(jobId, 
jobInfoKey);
+            if (jobInformation == null) {
+                final File dataFile = blobService.getFile(jobId, jobInfoKey);
+                // NOTE: Do not delete the job info BLOB since it may be 
needed again during
+                // recovery. (it is deleted automatically on the BLOB server 
and cache when the job
+                // enters a terminal state)
+                jobInformation =
+                        InstantiationUtil.deserializeObject(
+                                FileUtils.readAllBytes(dataFile.toPath()),
+                                getClass().getClassLoader());
+                jobInformationCache.put(jobId, jobInfoKey, jobInformation);
+            }
+            this.jobInformation = jobInformation.deepCopy();
         }
 
         // re-integrate offloaded task info from blob
@@ -270,13 +295,19 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
 
             Preconditions.checkNotNull(blobService);
 
-            final File dataFile = blobService.getFile(jobId, taskInfoKey);
-            // NOTE: Do not delete the task info BLOB since it may be needed 
again during recovery.
-            //       (it is deleted automatically on the BLOB server and cache 
when the job
-            //       enters a terminal state)
-            SerializedValue<TaskInformation> serializedValue =
-                    
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
-            serializedTaskInformation = new NonOffloaded<>(serializedValue);
+            TaskInformation taskInformation = taskInformationCache.get(jobId, 
taskInfoKey);
+            if (taskInformation == null) {
+                final File dataFile = blobService.getFile(jobId, taskInfoKey);
+                // NOTE: Do not delete the task info BLOB since it may be 
needed again during
+                // recovery. (it is deleted automatically on the BLOB server 
and cache when the job
+                // enters a terminal state)
+                taskInformation =
+                        InstantiationUtil.deserializeObject(
+                                FileUtils.readAllBytes(dataFile.toPath()),
+                                getClass().getClassLoader());
+                taskInformationCache.put(jobId, taskInfoKey, taskInformation);
+            }
+            this.taskInformation = taskInformation.deepCopy();
         }
 
         for (InputGateDeploymentDescriptor inputGate : inputGates) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
index f30bf6f32d2..5792caa2711 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -21,13 +21,18 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
+import 
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableCollection;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
 import java.io.Serializable;
 import java.net.URL;
 import java.util.Collection;
+import java.util.Objects;
 
 /** Container class for job information which is stored in the {@link 
ExecutionGraph}. */
 public class JobInformation implements Serializable {
@@ -44,13 +49,13 @@ public class JobInformation implements Serializable {
     private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
     /** Configuration of the job. */
-    private final Configuration jobConfiguration;
+    private final UnmodifiableConfiguration jobConfiguration;
 
     /** Blob keys for the required jar files. */
-    private final Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+    private final ImmutableCollection<PermanentBlobKey> 
requiredJarFileBlobKeys;
 
     /** URLs specifying the classpath to add to the class loader. */
-    private final Collection<URL> requiredClasspathURLs;
+    private final ImmutableCollection<URL> requiredClasspathURLs;
 
     public JobInformation(
             JobID jobId,
@@ -62,9 +67,12 @@ public class JobInformation implements Serializable {
         this.jobId = Preconditions.checkNotNull(jobId);
         this.jobName = Preconditions.checkNotNull(jobName);
         this.serializedExecutionConfig = 
Preconditions.checkNotNull(serializedExecutionConfig);
-        this.jobConfiguration = Preconditions.checkNotNull(jobConfiguration);
-        this.requiredJarFileBlobKeys = 
Preconditions.checkNotNull(requiredJarFileBlobKeys);
-        this.requiredClasspathURLs = 
Preconditions.checkNotNull(requiredClasspathURLs);
+        this.jobConfiguration =
+                new 
UnmodifiableConfiguration(Preconditions.checkNotNull(jobConfiguration));
+        this.requiredJarFileBlobKeys =
+                
ImmutableList.copyOf(Preconditions.checkNotNull(requiredJarFileBlobKeys));
+        this.requiredClasspathURLs =
+                
ImmutableList.copyOf(Preconditions.checkNotNull(requiredClasspathURLs));
     }
 
     public JobID getJobId() {
@@ -79,18 +87,51 @@ public class JobInformation implements Serializable {
         return serializedExecutionConfig;
     }
 
-    public Configuration getJobConfiguration() {
+    public UnmodifiableConfiguration getJobConfiguration() {
         return jobConfiguration;
     }
 
-    public Collection<PermanentBlobKey> getRequiredJarFileBlobKeys() {
+    public ImmutableCollection<PermanentBlobKey> getRequiredJarFileBlobKeys() {
         return requiredJarFileBlobKeys;
     }
 
-    public Collection<URL> getRequiredClasspathURLs() {
+    public ImmutableCollection<URL> getRequiredClasspathURLs() {
         return requiredClasspathURLs;
     }
 
+    // All fields are immutable, so return this directly.
+    public JobInformation deepCopy() {
+        return this;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        JobInformation that = (JobInformation) o;
+        return Objects.equals(jobId, that.jobId)
+                && Objects.equals(jobName, that.jobName)
+                && Objects.equals(serializedExecutionConfig, 
that.serializedExecutionConfig)
+                && Objects.equals(jobConfiguration, that.jobConfiguration)
+                && Objects.equals(requiredJarFileBlobKeys, 
that.requiredJarFileBlobKeys)
+                && Objects.equals(requiredClasspathURLs, 
that.requiredClasspathURLs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                jobId,
+                jobName,
+                serializedExecutionConfig,
+                jobConfiguration,
+                requiredJarFileBlobKeys,
+                requiredClasspathURLs);
+    }
+
     // ------------------------------------------------------------------------
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
index 6a3b1a26e7f..e1b59d4bfb1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Container class for operator/task specific information which are stored at 
the {@link
@@ -88,4 +89,43 @@ public class TaskInformation implements Serializable {
     public Configuration getTaskConfiguration() {
         return taskConfiguration;
     }
+
+    public TaskInformation deepCopy() {
+        return new TaskInformation(
+                getJobVertexId(),
+                getTaskName(),
+                getNumberOfSubtasks(),
+                getMaxNumberOfSubtasks(),
+                getInvokableClassName(),
+                // Return the new Configuration to avoid shared conf being 
changed.
+                new Configuration(getTaskConfiguration()));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TaskInformation that = (TaskInformation) o;
+        return numberOfSubtasks == that.numberOfSubtasks
+                && maxNumberOfSubtasks == that.maxNumberOfSubtasks
+                && Objects.equals(jobVertexId, that.jobVertexId)
+                && Objects.equals(taskName, that.taskName)
+                && Objects.equals(invokableClassName, that.invokableClassName)
+                && Objects.equals(taskConfiguration, that.taskConfiguration);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                jobVertexId,
+                taskName,
+                numberOfSubtasks,
+                maxNumberOfSubtasks,
+                invokableClassName,
+                taskConfiguration);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7146c2befa9..c3a705e0e0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -300,6 +300,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
     private final ThreadInfoSampleService threadInfoSampleService;
 
+    private final GroupCache<JobID, PermanentBlobKey, JobInformation> 
jobInformationCache;
+    private final GroupCache<JobID, PermanentBlobKey, TaskInformation> 
taskInformationCache;
     private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup>
             shuffleDescriptorsCache;
 
@@ -378,6 +380,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                 
taskExecutorServices.getSlotAllocationSnapshotPersistenceService();
 
         this.sharedResources = taskExecutorServices.getSharedResources();
+        this.jobInformationCache = 
taskExecutorServices.getJobInformationCache();
+        this.taskInformationCache = 
taskExecutorServices.getTaskInformationCache();
         this.shuffleDescriptorsCache = 
taskExecutorServices.getShuffleDescriptorCache();
     }
 
@@ -508,6 +512,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         changelogStoragesManager.shutdown();
         channelStateExecutorFactoryManager.shutdown();
 
+        jobInformationCache.clear();
+        taskInformationCache.clear();
         shuffleDescriptorsCache.clear();
 
         Preconditions.checkState(jobTable.isEmpty());
@@ -672,7 +678,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             // re-integrate offloaded data and deserialize shuffle descriptors
             try {
                 tdd.loadBigData(
-                        taskExecutorBlobService.getPermanentBlobService(), 
shuffleDescriptorsCache);
+                        taskExecutorBlobService.getPermanentBlobService(),
+                        jobInformationCache,
+                        taskInformationCache,
+                        shuffleDescriptorsCache);
             } catch (IOException | ClassNotFoundException e) {
                 throw new TaskSubmissionException(
                         "Could not re-integrate offloaded 
TaskDeploymentDescriptor data.", e);
@@ -682,12 +691,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             final JobInformation jobInformation;
             final TaskInformation taskInformation;
             try {
-                jobInformation =
-                        tdd.getSerializedJobInformation()
-                                .deserializeValue(getClass().getClassLoader());
-                taskInformation =
-                        tdd.getSerializedTaskInformation()
-                                .deserializeValue(getClass().getClassLoader());
+                jobInformation = tdd.getJobInformation();
+                taskInformation = tdd.getTaskInformation();
             } catch (IOException | ClassNotFoundException e) {
                 throw new TaskSubmissionException(
                         "Could not deserialize the job or task information.", 
e);
@@ -1907,6 +1912,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         changelogStoragesManager.releaseResourcesForJob(jobId);
         currentSlotOfferPerJob.remove(jobId);
         channelStateExecutorFactoryManager.releaseResourcesForJob(jobId);
+        jobInformationCache.clearCacheForGroup(jobId);
+        taskInformationCache.clearCacheForGroup(jobId);
         shuffleDescriptorsCache.clearCacheForGroup(jobId);
         fileMergingManager.releaseMergingSnapshotManagerForJob(jobId);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ae78575410e..586d8291a0f 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.Shuff
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -95,6 +97,8 @@ public class TaskManagerServices {
     private final LibraryCacheManager libraryCacheManager;
     private final SlotAllocationSnapshotPersistenceService 
slotAllocationSnapshotPersistenceService;
     private final SharedResources sharedResources;
+    private final GroupCache<JobID, PermanentBlobKey, JobInformation> 
jobInformationCache;
+    private final GroupCache<JobID, PermanentBlobKey, TaskInformation> 
taskInformationCache;
     private final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup>
             shuffleDescriptorsCache;
 
@@ -117,6 +121,8 @@ public class TaskManagerServices {
             LibraryCacheManager libraryCacheManager,
             SlotAllocationSnapshotPersistenceService 
slotAllocationSnapshotPersistenceService,
             SharedResources sharedResources,
+            GroupCache<JobID, PermanentBlobKey, JobInformation> 
jobInformationCache,
+            GroupCache<JobID, PermanentBlobKey, TaskInformation> 
taskInformationCache,
             GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache) {
 
         this.unresolvedTaskManagerLocation =
@@ -139,6 +145,8 @@ public class TaskManagerServices {
         this.libraryCacheManager = 
Preconditions.checkNotNull(libraryCacheManager);
         this.slotAllocationSnapshotPersistenceService = 
slotAllocationSnapshotPersistenceService;
         this.sharedResources = Preconditions.checkNotNull(sharedResources);
+        this.jobInformationCache = jobInformationCache;
+        this.taskInformationCache = taskInformationCache;
         this.shuffleDescriptorsCache = 
Preconditions.checkNotNull(shuffleDescriptorsCache);
     }
 
@@ -214,6 +222,14 @@ public class TaskManagerServices {
         return sharedResources;
     }
 
+    public GroupCache<JobID, PermanentBlobKey, JobInformation> 
getJobInformationCache() {
+        return jobInformationCache;
+    }
+
+    public GroupCache<JobID, PermanentBlobKey, TaskInformation> 
getTaskInformationCache() {
+        return taskInformationCache;
+    }
+
     public GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
getShuffleDescriptorCache() {
         return shuffleDescriptorsCache;
     }
@@ -415,6 +431,11 @@ public class TaskManagerServices {
                     NoOpSlotAllocationSnapshotPersistenceService.INSTANCE;
         }
 
+        final GroupCache<JobID, PermanentBlobKey, JobInformation> 
jobInformationCache =
+                new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
JobInformation>().create();
+        final GroupCache<JobID, PermanentBlobKey, TaskInformation> 
taskInformationCache =
+                new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
TaskInformation>().create();
+
         final GroupCache<JobID, PermanentBlobKey, ShuffleDescriptorGroup> 
shuffleDescriptorsCache =
                 new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
ShuffleDescriptorGroup>()
                         .create();
@@ -438,6 +459,8 @@ public class TaskManagerServices {
                 libraryCacheManager,
                 slotAllocationSnapshotPersistenceService,
                 new SharedResources(),
+                jobInformationCache,
+                taskInformationCache,
                 shuffleDescriptorsCache);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 6191b19767d..ac003665584 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -20,9 +20,13 @@ package org.apache.flink.runtime.deployment;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -32,14 +36,21 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.util.DefaultGroupCache;
+import org.apache.flink.runtime.util.GroupCache;
+import org.apache.flink.runtime.util.NoOpGroupCache;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.net.URL;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -50,6 +61,8 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Tests for the {@link TaskDeploymentDescriptor}. */
 class TaskDeploymentDescriptorTest {
 
+    @TempDir Path temporaryFolder;
+
     private static final JobID jobID = new JobID();
     private static final JobVertexID vertexID = new JobVertexID();
     private static final ExecutionAttemptID execId = 
createExecutionAttemptId(vertexID);
@@ -73,24 +86,28 @@ class TaskDeploymentDescriptorTest {
 
     private final SerializedValue<ExecutionConfig> executionConfig =
             new SerializedValue<>(new ExecutionConfig());
+
+    private final JobInformation jobInformation =
+            new JobInformation(
+                    jobID,
+                    jobName,
+                    executionConfig,
+                    jobConfiguration,
+                    requiredJars,
+                    requiredClasspaths);
     private final SerializedValue<JobInformation> serializedJobInformation =
-            new SerializedValue<>(
-                    new JobInformation(
-                            jobID,
-                            jobName,
-                            executionConfig,
-                            jobConfiguration,
-                            requiredJars,
-                            requiredClasspaths));
+            new SerializedValue<>(jobInformation);
+
+    private final TaskInformation taskInformation =
+            new TaskInformation(
+                    vertexID,
+                    taskName,
+                    currentNumberOfSubtasks,
+                    numberOfKeyGroups,
+                    invokableClass.getName(),
+                    taskConfiguration);
     private final SerializedValue<TaskInformation> 
serializedJobVertexInformation =
-            new SerializedValue<>(
-                    new TaskInformation(
-                            vertexID,
-                            taskName,
-                            currentNumberOfSubtasks,
-                            numberOfKeyGroups,
-                            invokableClass.getName(),
-                            taskConfiguration));
+            new SerializedValue<>(taskInformation);
 
     TaskDeploymentDescriptorTest() throws IOException {}
 
@@ -104,19 +121,15 @@ class TaskDeploymentDescriptorTest {
 
         final TaskDeploymentDescriptor copy = 
CommonTestUtils.createCopySerializable(orig);
 
-        assertThat(orig.getSerializedJobInformation())
-                .isNotSameAs(copy.getSerializedJobInformation());
-        assertThat(orig.getSerializedTaskInformation())
-                .isNotSameAs(copy.getSerializedTaskInformation());
+        
assertThat(orig.getJobInformation()).isNotSameAs(copy.getJobInformation());
+        
assertThat(orig.getTaskInformation()).isNotSameAs(copy.getTaskInformation());
         
assertThat(orig.getExecutionAttemptId()).isNotSameAs(copy.getExecutionAttemptId());
         assertThat(orig.getTaskRestore()).isNotSameAs(copy.getTaskRestore());
         
assertThat(orig.getProducedPartitions()).isNotSameAs(copy.getProducedPartitions());
         assertThat(orig.getInputGates()).isNotSameAs(copy.getInputGates());
 
-        assertThat(orig.getSerializedJobInformation())
-                .isEqualTo(copy.getSerializedJobInformation());
-        assertThat(orig.getSerializedTaskInformation())
-                .isEqualTo(copy.getSerializedTaskInformation());
+        
assertThat(orig.getJobInformation()).isEqualTo(copy.getJobInformation());
+        
assertThat(orig.getTaskInformation()).isEqualTo(copy.getTaskInformation());
         
assertThat(orig.getExecutionAttemptId()).isEqualTo(copy.getExecutionAttemptId());
         assertThat(orig.getAllocationId()).isEqualTo(copy.getAllocationId());
         assertThat(orig.getSubtaskIndex()).isEqualTo(copy.getSubtaskIndex());
@@ -130,20 +143,88 @@ class TaskDeploymentDescriptorTest {
     }
 
     @Test
-    void testOffLoadedAndNonOffLoadedPayload() {
+    void testOffLoadedAndNonOffLoadedPayload() throws IOException, 
ClassNotFoundException {
         final TaskDeploymentDescriptor taskDeploymentDescriptor =
                 createTaskDeploymentDescriptor(
                         new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
                         new TaskDeploymentDescriptor.Offloaded<>(new 
PermanentBlobKey()));
 
-        SerializedValue<JobInformation> actualSerializedJobInformation =
-                taskDeploymentDescriptor.getSerializedJobInformation();
-        
assertThat(actualSerializedJobInformation).isSameAs(serializedJobInformation);
+        JobInformation actualJobInformation = 
taskDeploymentDescriptor.getJobInformation();
+        assertThat(actualJobInformation).isEqualTo(jobInformation);
 
-        
assertThatThrownBy(taskDeploymentDescriptor::getSerializedTaskInformation)
+        assertThatThrownBy(taskDeploymentDescriptor::getTaskInformation)
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    @Test
+    void testTaskInformationCache() throws IOException, ClassNotFoundException 
{
+        try (BlobServer blobServer = setupBlobServer()) {
+            // Serialize taskInformation to blobServer and get the 
permanentBlobKey
+            Either<SerializedValue<TaskInformation>, PermanentBlobKey> 
taskInformationOrBlobKey =
+                    BlobWriter.serializeAndTryOffload(taskInformation, jobID, 
blobServer);
+            assertThat(taskInformationOrBlobKey.isRight()).isTrue();
+            PermanentBlobKey permanentBlobKey = 
taskInformationOrBlobKey.right();
+
+            GroupCache<JobID, PermanentBlobKey, TaskInformation> 
taskInformationCache =
+                    new DefaultGroupCache.Factory<JobID, PermanentBlobKey, 
TaskInformation>()
+                            .create();
+            // Test for tdd1
+            final TaskDeploymentDescriptor tdd1 =
+                    createTaskDeploymentDescriptor(
+                            new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+                            new 
TaskDeploymentDescriptor.Offloaded<>(permanentBlobKey));
+            assertThat(taskInformationCache.get(jobID, 
permanentBlobKey)).isNull();
+            tdd1.loadBigData(
+                    blobServer,
+                    new NoOpGroupCache<>(),
+                    taskInformationCache,
+                    new NoOpGroupCache<>());
+            TaskInformation taskInformation1 = tdd1.getTaskInformation();
+            assertThat(taskInformation1).isEqualTo(taskInformation);
+            // The TaskInformation is cached in taskInformationCache, and it's 
equals to
+            // taskInformation1.
+            assertThat(taskInformationCache.get(jobID, permanentBlobKey))
+                    .isNotNull()
+                    .isEqualTo(taskInformation1);
+
+            // Test for tdd2
+            final TaskDeploymentDescriptor tdd2 =
+                    createTaskDeploymentDescriptor(
+                            new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+                            new 
TaskDeploymentDescriptor.Offloaded<>(permanentBlobKey));
+            tdd2.loadBigData(
+                    blobServer,
+                    new NoOpGroupCache<>(),
+                    taskInformationCache,
+                    new NoOpGroupCache<>());
+            TaskInformation taskInformation2 = tdd2.getTaskInformation();
+            // The TaskInformation2 is equals to taskInformation1 and original 
taskInformation, but
+            // they are not same.
+            assertThat(taskInformation2)
+                    .isNotNull()
+                    .isEqualTo(taskInformation1)
+                    .isNotSameAs(taskInformation1)
+                    .isEqualTo(taskInformation);
+            // Configuration may be changed by subtask, so the configuration 
must be not same.
+            assertThat(taskInformation2.getTaskConfiguration())
+                    .isNotNull()
+                    .isEqualTo(taskInformation1.getTaskConfiguration())
+                    .isNotSameAs(taskInformation1.getTaskConfiguration());
+        }
+    }
+
+    private BlobServer setupBlobServer() throws IOException {
+
+        Configuration config = new Configuration();
+        // always offload the serialized job and task information
+        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+        BlobServer blobServer =
+                new BlobServer(
+                        config, TempDirUtils.newFolder(temporaryFolder), new 
VoidBlobStore());
+        blobServer.start();
+        return blobServer;
+    }
+
     @Nonnull
     private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
             TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> 
jobInformation,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
index a5508f9698e..08783509a9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
@@ -204,9 +204,7 @@ public class JobMasterTester implements Closeable {
                                                                     "Task 
descriptor for %s not found.",
                                                                     
executionAttemptId)));
                     try {
-                        return descriptor
-                                .getSerializedTaskInformation()
-                                
.deserializeValue(Thread.currentThread().getContextClassLoader());
+                        return descriptor.getTaskInformation();
                     } catch (Exception e) {
                         throw new IllegalStateException(
                                 String.format(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index 54d8fc30964..d354b49fedd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -179,7 +179,11 @@ class DefaultExecutionGraphDeploymentTest {
         taskManagerGateway.setSubmitConsumer(
                 FunctionUtils.uncheckedConsumer(
                         taskDeploymentDescriptor -> {
-                            taskDeploymentDescriptor.loadBigData(blobCache, 
new NoOpGroupCache<>());
+                            taskDeploymentDescriptor.loadBigData(
+                                    blobCache,
+                                    new NoOpGroupCache<>(),
+                                    new NoOpGroupCache<>(),
+                                    new NoOpGroupCache<>());
                             tdd.complete(taskDeploymentDescriptor);
                         }));
 
@@ -202,10 +206,8 @@ class DefaultExecutionGraphDeploymentTest {
         TaskDeploymentDescriptor descr = tdd.get();
         assertThat(descr).isNotNull();
 
-        JobInformation jobInformation =
-                
descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
-        TaskInformation taskInformation =
-                
descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
+        JobInformation jobInformation = descr.getJobInformation();
+        TaskInformation taskInformation = descr.getTaskInformation();
 
         assertThat(descr.getJobId()).isEqualTo(jobId);
         assertThat(jobInformation.getJobId()).isEqualTo(jobId);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
index 1848faa3959..fae6a754646 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
@@ -120,7 +120,11 @@ class 
DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
         taskManagerGateway.setSubmitConsumer(
                 FunctionUtils.uncheckedConsumer(
                         taskDeploymentDescriptor -> {
-                            taskDeploymentDescriptor.loadBigData(blobCache, 
new NoOpGroupCache<>());
+                            taskDeploymentDescriptor.loadBigData(
+                                    blobCache,
+                                    new NoOpGroupCache<>(),
+                                    new NoOpGroupCache<>(),
+                                    new NoOpGroupCache<>());
                             tdds.offer(taskDeploymentDescriptor);
                         }));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 193e56e20c3..68d9d5a275d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -201,6 +201,8 @@ public class TaskManagerServicesBuilder {
                 libraryCacheManager,
                 slotAllocationSnapshotPersistenceService,
                 sharedResources,
+                new NoOpGroupCache<>(),
+                new NoOpGroupCache<>(),
                 new NoOpGroupCache<>());
     }
 }


Reply via email to