rkhachatryan commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r940264768


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java:
##########
@@ -63,6 +66,9 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
     private final AtomicInteger logIdGenerator = new AtomicInteger(0);
 
     private final TaskChangelogRegistry changelogRegistry;
+
+    @Nullable private LocalChangelogRegistry localChangelogRegistry = null;
+
     /** The configuration for local recovery. */
     @Nonnull private final LocalRecoveryConfig localRecoveryConfig;

Review Comment:
   nit: I'd replace this pair `Nullable registry` and `Nonnull config` with 
some no-op `LocalChangelogRegistry`.
   In `FsStateChangelogWriter.confirm()`, we could just check `if 
(result.localStreamHandle != null) { ... } `



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java:
##########
@@ -104,17 +126,24 @@ public FsStateChangelogStorage(
                                 metricGroup,
                                 changelogRegistry)),
                 PREEMPTIVE_PERSIST_THRESHOLD.defaultValue().getBytes(),
-                changelogRegistry);
+                changelogRegistry,
+                localRecoveryConfig);
     }
 
     @VisibleForTesting
     public FsStateChangelogStorage(
             StateChangeUploadScheduler uploader,
             long preEmptivePersistThresholdInBytes,
-            TaskChangelogRegistry changelogRegistry) {
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig) {
         this.preEmptivePersistThresholdInBytes = 
preEmptivePersistThresholdInBytes;
         this.changelogRegistry = changelogRegistry;
         this.uploader = uploader;
+        this.localRecoveryConfig = localRecoveryConfig;
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            this.localChangelogRegistry =
+                    new 
LocalChangelogRegistry(Executors.newSingleThreadExecutor());

Review Comment:
   `TaskManagerServices.ioExecutor` is used for other jobs as well; re-using it 
might break resource isolation.
   It would make sense to share the threads between `TaskChangelogRegistry` and 
`LocalChangelogRegistry`, but creating two threads per job on TM seems fine too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to