rkhachatryan commented on a change in pull request #16341:
URL: https://github.com/apache/flink/pull/16341#discussion_r663058564
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1088,6 +1091,7 @@ private TaskExecutorJobServices
registerNewJobAndCreateServices(
return TaskExecutorJobServices.create(
libraryCacheManager.registerClassLoaderLease(jobId),
+
StateChangelogStorageLoader.load(taskManagerConfiguration.getConfiguration()),
Review comment:
Here, the configuration is for the TM, not for the job, right?
And we want the storage to be created with the job config (falling back to
cluster config).
I think the right place for this call would be `TaskExecutor.submitTask`
where we have access to job configuration (from `TaskDeploymentDescriptor`).
That means we'll have to introduce some kind of cache there:
```
changelogStorageMap.computeIfAbsent(
jobId,
unused ->
changelogStorageLoader.load(jobInformation.getJobConfiguration()));
```
(that would also eliminate the need to have it in `JobTable...`)
WDYT?
cc: @tillrohrmann
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1088,6 +1091,7 @@ private TaskExecutorJobServices
registerNewJobAndCreateServices(
return TaskExecutorJobServices.create(
libraryCacheManager.registerClassLoaderLease(jobId),
+
StateChangelogStorageLoader.load(taskManagerConfiguration.getConfiguration()),
Review comment:
We are using TM configuration here, not the job one, right?
While changelog storage can be configured per job.
So the right place to do this I think is `TaskExecutor.submitTask` - where
we have access to job configuration (from task deployment descriptor).
That means we'll have to cache it like:
```
changelogStorageMap.computeIfAbsent(
jobId,
unused ->
changelogStorageLoader.load(jobInformation.getJobConfiguration())
);
```
That also means we don't need to add it to `JobTable...` classes.
WDYT?
cc: @tillrohrmann
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
##########
@@ -18,26 +18,81 @@
package org.apache.flink.runtime.state.changelog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
import java.util.Iterator;
import java.util.ServiceLoader;
+import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat;
/** A thin wrapper around {@link PluginManager} to load {@link
StateChangelogStorage}. */
@Internal
public class StateChangelogStorageLoader {
- private final PluginManager pluginManager;
- public StateChangelogStorageLoader(PluginManager pluginManager) {
- this.pluginManager = pluginManager;
+ private static final Logger LOG =
LoggerFactory.getLogger(StateChangelogStorageLoader.class);
+
+ /** Object used to protect calls to specific methods. */
+ private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+ /**
+ * Mapping of state changelog storage identifier to the corresponding
storage factories,
+ * populated in {@link
StateChangelogStorageLoader#initialize(PluginManager)}.
+ */
+ private static final HashMap<String, StateChangelogStorageFactory>
+ STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>();
+
+ static {
+ // Guarantee to trigger once.
+ initialize(null);
+ }
+
+ public static void initialize(PluginManager pluginManager) {
+ LOCK.lock();
+ try {
+ STATE_CHANGELOG_STORAGE_FACTORIES.clear();
+ Iterator<StateChangelogStorageFactory> iterator =
+ pluginManager == null
+ ?
ServiceLoader.load(StateChangelogStorageFactory.class).iterator()
+ : concat(
+
pluginManager.load(StateChangelogStorageFactory.class),
+
ServiceLoader.load(StateChangelogStorageFactory.class)
+ .iterator());
+ iterator.forEachRemaining(
+ factory ->
+ STATE_CHANGELOG_STORAGE_FACTORIES.putIfAbsent(
+ factory.getIdentifier(), factory));
+ LOG.info(
+ "StateChangelogStorageLoader initialized with shortcut
names {{}}.",
+ String.join(",",
STATE_CHANGELOG_STORAGE_FACTORIES.keySet()));
+ } finally {
+ LOCK.unlock();
+ }
}
@SuppressWarnings({"rawtypes"})
- public Iterator<StateChangelogStorage> load() {
- return concat(
- pluginManager.load(StateChangelogStorage.class),
- ServiceLoader.load(StateChangelogStorage.class).iterator());
+ public static StateChangelogStorage load(Configuration configuration) {
Review comment:
Is it possible to avoid `static` and instead create an instance of
`StateChangelogStorageLoader`?
I [tried](https://github.com/rkhachatryan/flink/tree/f21804-non-static) to
do so and it seems possible.
(the advantages I see are immutable state, no possibility of un-initialized
access and easier to test)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]