rkhachatryan commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r940264768
##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java:
##########
@@ -63,6 +66,9 @@ public class FsStateChangelogStorage extends
FsStateChangelogStorageForRecovery
private final AtomicInteger logIdGenerator = new AtomicInteger(0);
private final TaskChangelogRegistry changelogRegistry;
+
+ @Nullable private LocalChangelogRegistry localChangelogRegistry = null;
+
/** The configuration for local recovery. */
@Nonnull private final LocalRecoveryConfig localRecoveryConfig;
Review Comment:
nit: I'd replace this pair `Nullable registry` and `Nonnull config` with
some no-op `LocalChangelogRegistry`.
In `FsStateChangelogWriter.confirm()`, we could just check `if
(result.localStreamHandle != null) { ... } `
##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java:
##########
@@ -104,17 +126,24 @@ public FsStateChangelogStorage(
metricGroup,
changelogRegistry)),
PREEMPTIVE_PERSIST_THRESHOLD.defaultValue().getBytes(),
- changelogRegistry);
+ changelogRegistry,
+ localRecoveryConfig);
}
@VisibleForTesting
public FsStateChangelogStorage(
StateChangeUploadScheduler uploader,
long preEmptivePersistThresholdInBytes,
- TaskChangelogRegistry changelogRegistry) {
+ TaskChangelogRegistry changelogRegistry,
+ LocalRecoveryConfig localRecoveryConfig) {
this.preEmptivePersistThresholdInBytes =
preEmptivePersistThresholdInBytes;
this.changelogRegistry = changelogRegistry;
this.uploader = uploader;
+ this.localRecoveryConfig = localRecoveryConfig;
+ if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+ this.localChangelogRegistry =
+ new
LocalChangelogRegistry(Executors.newSingleThreadExecutor());
Review Comment:
`TaskManagerServices.ioExecutor` is used for other jobs as well; re-using it
might break resource isolation.
It would make sense to share the threads between `TaskChangelogRegistry` and
`LocalChangelogRegistry`, but creating two threads per job on TM seems fine too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]