xintongsong commented on code in PR #19108:
URL: https://github.com/apache/flink/pull/19108#discussion_r917502613


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -132,6 +136,12 @@ public class StreamConfig implements Serializable {
 
     private final Configuration config;
 
+    private final transient Map<String, Object> objectSerializationConfig = 
new HashMap<>();
+    private final transient Map<Integer, CompletableFuture<StreamConfig>> 
chainedTaskFutures =
+            new HashMap<>();
+    private final transient CompletableFuture<StreamConfig> 
serializationFuture =
+            new CompletableFuture<>();

Review Comment:
   I'd suggest to add a brief explanation here:
   - We first collect all the need-to-be-serialized objects, then serialize 
them at once
   - The purpose is to make parallelization of `StreamConfig` serialization 
easier.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -140,6 +150,67 @@ public Configuration getConfiguration() {
         return config;
     }
 
+    public CompletableFuture<StreamConfig> getSerializationFuture() {
+        return serializationFuture;
+    }
+
+    /** Trigger the object config serialization and return the completable 
future. */
+    public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture(
+            Executor ioExecutor) {
+        FutureUtils.combineAll(chainedTaskFutures.values())
+                .thenAcceptAsync(
+                        chainedConfigs -> {
+                            // Serialize all the objects to config.
+                            serializedAllConfigs();
+
+                            try {
+                                InstantiationUtil.writeObjectToConfig(
+                                        chainedConfigs.stream()
+                                                .collect(
+                                                        Collectors.toMap(
+                                                                
StreamConfig::getVertexID,
+                                                                
Function.identity())),
+                                        this.config,
+                                        CHAINED_TASK_CONFIG);
+                            } catch (IOException e) {
+                                throw new StreamTaskException(
+                                        "Could not serialize object for key 
chained task config.",
+                                        e);
+                            }
+                            serializationFuture.complete(this);
+                        },
+                        ioExecutor);
+        return serializationFuture;
+    }
+
+    /**
+     * Serialize all object configs synchronously. Only used for operators 
which need to reconstruct
+     * the StreamConfig internally or test.
+     */
+    public void serializedAllConfigs() {

Review Comment:
   ```suggestion
       public void serializeAllConfigs() {
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -132,6 +136,12 @@ public class StreamConfig implements Serializable {
 
     private final Configuration config;
 
+    private final transient Map<String, Object> objectSerializationConfig = 
new HashMap<>();

Review Comment:
   ```suggestion
       private final transient Map<String, Object> toBeSerializedConfigObjects 
= new HashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to