xintongsong commented on code in PR #19108:
URL: https://github.com/apache/flink/pull/19108#discussion_r917502613
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -132,6 +136,12 @@ public class StreamConfig implements Serializable {
private final Configuration config;
+ private final transient Map<String, Object> objectSerializationConfig =
new HashMap<>();
+ private final transient Map<Integer, CompletableFuture<StreamConfig>>
chainedTaskFutures =
+ new HashMap<>();
+ private final transient CompletableFuture<StreamConfig>
serializationFuture =
+ new CompletableFuture<>();
Review Comment:
I'd suggest to add a brief explanation here:
- We first collect all the need-to-be-serialized objects, then serialize
them at once
- The purpose is to make parallelization of `StreamConfig` serialization
easier.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -140,6 +150,67 @@ public Configuration getConfiguration() {
return config;
}
+ public CompletableFuture<StreamConfig> getSerializationFuture() {
+ return serializationFuture;
+ }
+
+ /** Trigger the object config serialization and return the completable
future. */
+ public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture(
+ Executor ioExecutor) {
+ FutureUtils.combineAll(chainedTaskFutures.values())
+ .thenAcceptAsync(
+ chainedConfigs -> {
+ // Serialize all the objects to config.
+ serializedAllConfigs();
+
+ try {
+ InstantiationUtil.writeObjectToConfig(
+ chainedConfigs.stream()
+ .collect(
+ Collectors.toMap(
+
StreamConfig::getVertexID,
+
Function.identity())),
+ this.config,
+ CHAINED_TASK_CONFIG);
+ } catch (IOException e) {
+ throw new StreamTaskException(
+ "Could not serialize object for key
chained task config.",
+ e);
+ }
+ serializationFuture.complete(this);
+ },
+ ioExecutor);
+ return serializationFuture;
+ }
+
+ /**
+ * Serialize all object configs synchronously. Only used for operators
which need to reconstruct
+ * the StreamConfig internally or test.
+ */
+ public void serializedAllConfigs() {
Review Comment:
```suggestion
public void serializeAllConfigs() {
```
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -132,6 +136,12 @@ public class StreamConfig implements Serializable {
private final Configuration config;
+ private final transient Map<String, Object> objectSerializationConfig =
new HashMap<>();
Review Comment:
```suggestion
private final transient Map<String, Object> toBeSerializedConfigObjects
= new HashMap<>();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]