rkhachatryan commented on a change in pull request #16341:
URL: https://github.com/apache/flink/pull/16341#discussion_r665112724



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
##########
@@ -28,16 +34,63 @@
 /** A thin wrapper around {@link PluginManager} to load {@link 
StateChangelogStorage}. */
 @Internal
 public class StateChangelogStorageLoader {
-    private final PluginManager pluginManager;
 
-    public StateChangelogStorageLoader(PluginManager pluginManager) {
-        this.pluginManager = pluginManager;
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateChangelogStorageLoader.class);
+
+    /**
+     * Mapping of state changelog storage identifier to the corresponding 
storage factories,
+     * populated in {@link 
StateChangelogStorageLoader#initialize(PluginManager)}.
+     */
+    private static final HashMap<String, StateChangelogStorageFactory>
+            STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>();
+
+    static {
+        // Guarantee to trigger once.
+        initialize(null);
     }
 
-    @SuppressWarnings({"rawtypes"})
-    public Iterator<StateChangelogStorage> load() {
-        return concat(
-                pluginManager.load(StateChangelogStorage.class),
-                ServiceLoader.load(StateChangelogStorage.class).iterator());
+    public static void initialize(PluginManager pluginManager) {
+        STATE_CHANGELOG_STORAGE_FACTORIES.clear();
+        Iterator<StateChangelogStorageFactory> iterator =
+                pluginManager == null
+                        ? 
ServiceLoader.load(StateChangelogStorageFactory.class).iterator()
+                        : concat(
+                                
pluginManager.load(StateChangelogStorageFactory.class),
+                                
ServiceLoader.load(StateChangelogStorageFactory.class).iterator());
+        iterator.forEachRemaining(
+                factory -> {
+                    String identifier = factory.getIdentifier().toLowerCase();
+                    StateChangelogStorageFactory prev =
+                            STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+                    if (prev == null) {
+                        STATE_CHANGELOG_STORAGE_FACTORIES.put(identifier, 
factory);
+                    } else {
+                        LOG.warn(
+                                "StateChangelogStorageLoader found duplicated 
factory,"
+                                        + " using {} instead of {} for name 
{}.",
+                                prev.getClass().getName(),
+                                factory.getClass().getName(),
+                                identifier);
+                    }
+                });
+        LOG.info(
+                "StateChangelogStorageLoader initialized with shortcut names 
{{}}.",
+                String.join(",", STATE_CHANGELOG_STORAGE_FACTORIES.keySet()));
+    }
+
+    public static StateChangelogStorage<?> load(Configuration configuration) {
+        final String identifier =
+                configuration
+                        
.getString(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE)
+                        .toLowerCase();
+
+        StateChangelogStorageFactory factory = 
STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+        if (factory == null) {
+            LOG.warn("Cannot find a factory for changelog storage with name 
'{}'.", identifier);
+            return null;

Review comment:
       Fair enough, removing jars and disabling changelog while keeping some 
value `state.backend.changelog.storage` seems a valid operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to