Zakelly commented on a change in pull request #16341:
URL: https://github.com/apache/flink/pull/16341#discussion_r665011729
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
##########
@@ -28,16 +34,63 @@
/** A thin wrapper around {@link PluginManager} to load {@link
StateChangelogStorage}. */
@Internal
public class StateChangelogStorageLoader {
- private final PluginManager pluginManager;
- public StateChangelogStorageLoader(PluginManager pluginManager) {
- this.pluginManager = pluginManager;
+ private static final Logger LOG =
LoggerFactory.getLogger(StateChangelogStorageLoader.class);
+
+ /**
+ * Mapping of state changelog storage identifier to the corresponding
storage factories,
+ * populated in {@link
StateChangelogStorageLoader#initialize(PluginManager)}.
+ */
+ private static final HashMap<String, StateChangelogStorageFactory>
+ STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>();
+
+ static {
+ // Guarantee to trigger once.
+ initialize(null);
}
- @SuppressWarnings({"rawtypes"})
- public Iterator<StateChangelogStorage> load() {
- return concat(
- pluginManager.load(StateChangelogStorage.class),
- ServiceLoader.load(StateChangelogStorage.class).iterator());
+ public static void initialize(PluginManager pluginManager) {
+ STATE_CHANGELOG_STORAGE_FACTORIES.clear();
+ Iterator<StateChangelogStorageFactory> iterator =
+ pluginManager == null
+ ?
ServiceLoader.load(StateChangelogStorageFactory.class).iterator()
+ : concat(
+
pluginManager.load(StateChangelogStorageFactory.class),
+
ServiceLoader.load(StateChangelogStorageFactory.class).iterator());
+ iterator.forEachRemaining(
+ factory -> {
+ String identifier = factory.getIdentifier().toLowerCase();
+ StateChangelogStorageFactory prev =
+ STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+ if (prev == null) {
+ STATE_CHANGELOG_STORAGE_FACTORIES.put(identifier,
factory);
+ } else {
+ LOG.warn(
+ "StateChangelogStorageLoader found duplicated
factory,"
+ + " using {} instead of {} for name
{}.",
+ prev.getClass().getName(),
+ factory.getClass().getName(),
+ identifier);
+ }
+ });
+ LOG.info(
+ "StateChangelogStorageLoader initialized with shortcut names
{{}}.",
+ String.join(",", STATE_CHANGELOG_STORAGE_FACTORIES.keySet()));
+ }
+
+ public static StateChangelogStorage<?> load(Configuration configuration) {
+ final String identifier =
+ configuration
+
.getString(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE)
+ .toLowerCase();
+
+ StateChangelogStorageFactory factory =
STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+ if (factory == null) {
+ LOG.warn("Cannot find a factory for changelog storage with name
'{}'.", identifier);
+ return null;
Review comment:
1. Currently there is a default value for ```STATE_CHANGE_LOG_STORAGE
``` so we won't run into this case. However, a ```Nullable``` mark here makes
sense. Will do.
2. I'd rather keep it return null here. User may disable changelog and
config a wrong value for ```STATE_CHANGE_LOG_STORAGE ``` (by some copy&paste
operation or missing jars for factory). Since they have disable the changelog,
the changelog-related configuration should better not make the job FAIL. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]