rkhachatryan commented on a change in pull request #16341:
URL: https://github.com/apache/flink/pull/16341#discussion_r665112724
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
##########
@@ -28,16 +34,63 @@
/** A thin wrapper around {@link PluginManager} to load {@link
StateChangelogStorage}. */
@Internal
public class StateChangelogStorageLoader {
- private final PluginManager pluginManager;
- public StateChangelogStorageLoader(PluginManager pluginManager) {
- this.pluginManager = pluginManager;
+ private static final Logger LOG =
LoggerFactory.getLogger(StateChangelogStorageLoader.class);
+
+ /**
+ * Mapping of state changelog storage identifier to the corresponding
storage factories,
+ * populated in {@link
StateChangelogStorageLoader#initialize(PluginManager)}.
+ */
+ private static final HashMap<String, StateChangelogStorageFactory>
+ STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>();
+
+ static {
+ // Guarantee to trigger once.
+ initialize(null);
}
- @SuppressWarnings({"rawtypes"})
- public Iterator<StateChangelogStorage> load() {
- return concat(
- pluginManager.load(StateChangelogStorage.class),
- ServiceLoader.load(StateChangelogStorage.class).iterator());
+ public static void initialize(PluginManager pluginManager) {
+ STATE_CHANGELOG_STORAGE_FACTORIES.clear();
+ Iterator<StateChangelogStorageFactory> iterator =
+ pluginManager == null
+ ?
ServiceLoader.load(StateChangelogStorageFactory.class).iterator()
+ : concat(
+
pluginManager.load(StateChangelogStorageFactory.class),
+
ServiceLoader.load(StateChangelogStorageFactory.class).iterator());
+ iterator.forEachRemaining(
+ factory -> {
+ String identifier = factory.getIdentifier().toLowerCase();
+ StateChangelogStorageFactory prev =
+ STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+ if (prev == null) {
+ STATE_CHANGELOG_STORAGE_FACTORIES.put(identifier,
factory);
+ } else {
+ LOG.warn(
+ "StateChangelogStorageLoader found duplicated
factory,"
+ + " using {} instead of {} for name
{}.",
+ prev.getClass().getName(),
+ factory.getClass().getName(),
+ identifier);
+ }
+ });
+ LOG.info(
+ "StateChangelogStorageLoader initialized with shortcut names
{{}}.",
+ String.join(",", STATE_CHANGELOG_STORAGE_FACTORIES.keySet()));
+ }
+
+ public static StateChangelogStorage<?> load(Configuration configuration) {
+ final String identifier =
+ configuration
+
.getString(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE)
+ .toLowerCase();
+
+ StateChangelogStorageFactory factory =
STATE_CHANGELOG_STORAGE_FACTORIES.get(identifier);
+ if (factory == null) {
+ LOG.warn("Cannot find a factory for changelog storage with name
'{}'.", identifier);
+ return null;
Review comment:
Fair enough, removing jars and disabling changelog while keeping some
value `state.backend.changelog.storage` seems a valid operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]