Zakelly commented on a change in pull request #16341:
URL: https://github.com/apache/flink/pull/16341#discussion_r665011729



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
##########
@@ -28,16 +34,63 @@
 /** A thin wrapper around {@link PluginManager} to load {@link 
StateChangelogStorage}. */
 @Internal
 public class StateChangelogStorageLoader {
-    private final PluginManager pluginManager;
 
-    public StateChangelogStorageLoader(PluginManager pluginManager) {
-        this.pluginManager = pluginManager;
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateChangelogStorageLoader.class);
+
+    /**
+     * Mapping of state changelog storage identifier to the corresponding 
storage factories,
+     * populated in {@link 
StateChangelogStorageLoader#initialize(PluginManager)}.
+     */
+    private static final HashMap<String, StateChangelogStorageFactory>
+            STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>();
+
+    static {
+        // Guarantee to trigger once.
+        initialize(null);
     }
 
-    @SuppressWarnings({"rawtypes"})
-    public Iterator<StateChangelogStorage> load() {
-        return concat(
-                pluginManager.load(StateChangelogStorage.class),
-                ServiceLoader.load(StateChangelogStorage.class).iterator());
+    public static void initialize(PluginManager pluginManager) {
+        STATE_CHANGELOG_STORAGE_FACTORIES.clear();
+        Iterator<StateChangelogStorageFactory> iterator =
+                pluginManager == null
+                        ? 
ServiceLoader.load(StateChangelogStorageFactory.class).iterator()
+                        : concat(
+                                
pluginManager.load(StateChangelogStorageFactory.class),
+                                
ServiceLoader.load(StateChangelogStorageFactory.class).iterator());
+        iterator.forEachRemaining(
+                factory -> {
+                    String identifier = factory.getIdentifier().toLowerCase();
+                    StateChangelogStorageFactory prev =
+                            STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+                    if (prev == null) {
+                        STATE_CHANGELOG_STORAGE_FACTORIES.put(identifier, 
factory);
+                    } else {
+                        LOG.warn(
+                                "StateChangelogStorageLoader found duplicated 
factory,"
+                                        + " using {} instead of {} for name 
{}.",
+                                prev.getClass().getName(),
+                                factory.getClass().getName(),
+                                identifier);
+                    }
+                });
+        LOG.info(
+                "StateChangelogStorageLoader initialized with shortcut names 
{{}}.",
+                String.join(",", STATE_CHANGELOG_STORAGE_FACTORIES.keySet()));
+    }
+
+    public static StateChangelogStorage<?> load(Configuration configuration) {
+        final String identifier =
+                configuration
+                        
.getString(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE)
+                        .toLowerCase();
+
+        StateChangelogStorageFactory factory = 
STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+        if (factory == null) {
+            LOG.warn("Cannot find a factory for changelog storage with name 
'{}'.", identifier);
+            return null;

Review comment:
       1. Currently there is a default value for ```STATE_CHANGE_LOG_STORAGE 
``` so we won't run into this case. However, a ```Nullable``` mark here makes 
sense. Will do.
   2. I'd rather keep it return null here. User may disable changelog and 
config a wrong value for  ```STATE_CHANGE_LOG_STORAGE ``` (by some copy&paste 
operation or missing jars for factory). Since they have disable the changelog, 
the changelog-related configuration should better not make the job FAIL. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to