This is an automated email from the ASF dual-hosted git repository. ndipiazza pushed a commit to branch TIKA-4576-add-fetcher-config-store-interface in repository https://gitbox.apache.org/repos/asf/tika.git
commit 357bab09a703cc4fb1eff9feb42c1e52d39745dd Author: Nicholas DiPiazza <[email protected]> AuthorDate: Tue Dec 16 13:29:00 2025 -0600 TIKA-4576 Create pluggable storage interface for Fetcher components with in-memory implementationi --- .../tika/pipes/core/AbstractComponentManager.java | 40 +++++++++---- .../apache/tika/pipes/core/config/ConfigStore.java | 66 ++++++++++++++++++++++ .../pipes/core/config/InMemoryConfigStore.java | 56 ++++++++++++++++++ .../tika/pipes/core/emitter/EmitterManager.java | 26 +++++++++ .../tika/pipes/core/fetcher/FetcherManager.java | 26 +++++++++ 5 files changed, 203 insertions(+), 11 deletions(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java index 02c77d4d2..059cfc839 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java @@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory; import org.apache.tika.config.loader.TikaObjectMapperFactory; import org.apache.tika.exception.TikaConfigException; import org.apache.tika.exception.TikaException; +import org.apache.tika.pipes.core.config.ConfigStore; +import org.apache.tika.pipes.core.config.InMemoryConfigStore; import org.apache.tika.plugins.ExtensionConfig; import org.apache.tika.plugins.TikaExtension; import org.apache.tika.plugins.TikaExtensionFactory; @@ -50,15 +52,23 @@ public abstract class AbstractComponentManager<T extends TikaExtension, private static final Logger LOG = LoggerFactory.getLogger(AbstractComponentManager.class); protected final PluginManager pluginManager; - private final Map<String, ExtensionConfig> componentConfigs = new ConcurrentHashMap<>(); + private final ConfigStore configStore; private final Map<String, T> componentCache = new ConcurrentHashMap<>(); private final boolean allowRuntimeModifications; protected AbstractComponentManager(PluginManager pluginManager, Map<String, ExtensionConfig> componentConfigs, boolean allowRuntimeModifications) { + this(pluginManager, componentConfigs, allowRuntimeModifications, new InMemoryConfigStore()); + } + + protected AbstractComponentManager(PluginManager pluginManager, + Map<String, ExtensionConfig> componentConfigs, + boolean allowRuntimeModifications, + ConfigStore configStore) { this.pluginManager = pluginManager; - this.componentConfigs.putAll(componentConfigs); + this.configStore = configStore; + componentConfigs.forEach(configStore::put); this.allowRuntimeModifications = allowRuntimeModifications; } @@ -67,6 +77,14 @@ public abstract class AbstractComponentManager<T extends TikaExtension, */ protected abstract String getConfigKey(); + /** + * Returns the config store used by this manager. + * Useful for subclasses that need direct access to the store. + */ + protected ConfigStore getConfigStore() { + return configStore; + } + /** * Returns the factory class for this component type. */ @@ -194,11 +212,11 @@ public abstract class AbstractComponentManager<T extends TikaExtension, } // Check if config exists - ExtensionConfig config = componentConfigs.get(id); + ExtensionConfig config = configStore.get(id); if (config == null) { throw createNotFoundException( "Can't find " + getComponentName() + " for id=" + id + - ". Available: " + componentConfigs.keySet()); + ". Available: " + configStore.keySet()); } // Synchronized block to ensure only one thread builds the component @@ -267,7 +285,7 @@ public abstract class AbstractComponentManager<T extends TikaExtension, String typeName = config.name(); // Check for duplicate ID - if (componentConfigs.containsKey(componentId)) { + if (configStore.containsKey(componentId)) { throw new TikaConfigException(getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + " with id '" + componentId + "' already exists"); } @@ -281,7 +299,7 @@ public abstract class AbstractComponentManager<T extends TikaExtension, } // Store config without instantiating - componentConfigs.put(componentId, config); + configStore.put(componentId, config); LOG.debug("Saved {} config: id={}, type={}", getComponentName(), componentId, typeName); } @@ -289,7 +307,7 @@ public abstract class AbstractComponentManager<T extends TikaExtension, * Returns the set of supported component IDs. */ public Set<String> getSupported() { - return componentConfigs.keySet(); + return configStore.keySet(); } /** @@ -299,15 +317,15 @@ public abstract class AbstractComponentManager<T extends TikaExtension, * @return the single configured component */ public T getComponent() throws IOException, TikaException { - if (componentConfigs.size() != 1) { + if (configStore.size() != 1) { throw new IllegalArgumentException( "No-arg get" + getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "() requires exactly 1 configured " + - getComponentName() + ". Found: " + componentConfigs.size() + - " (" + componentConfigs.keySet() + ")"); + getComponentName() + ". Found: " + configStore.size() + + " (" + configStore.keySet() + ")"); } // Get the single component id and use getComponent(id) for lazy loading - String componentId = componentConfigs.keySet().iterator().next(); + String componentId = configStore.keySet().iterator().next(); return getComponent(componentId); } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java new file mode 100644 index 000000000..854734783 --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core.config; + +import java.util.Set; + +import org.apache.tika.plugins.ExtensionConfig; + +/** + * Interface for storing and retrieving component configurations. + * Implementations can provide different storage backends (in-memory, database, distributed cache, etc.). + */ +public interface ConfigStore { + + /** + * Stores a configuration. + * + * @param id the configuration ID + * @param config the configuration to store + */ + void put(String id, ExtensionConfig config); + + /** + * Retrieves a configuration by ID. + * + * @param id the configuration ID + * @return the configuration, or null if not found + */ + ExtensionConfig get(String id); + + /** + * Checks if a configuration exists. + * + * @param id the configuration ID + * @return true if the configuration exists + */ + boolean containsKey(String id); + + /** + * Returns all configuration IDs. + * + * @return set of all configuration IDs + */ + Set<String> keySet(); + + /** + * Returns the number of stored configurations. + * + * @return the number of configurations + */ + int size(); +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java new file mode 100644 index 000000000..e1531e4b8 --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core.config; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.tika.plugins.ExtensionConfig; + +/** + * Default in-memory implementation of {@link ConfigStore} using a {@link ConcurrentHashMap}. + * Thread-safe and suitable for single-instance deployments. + */ +public class InMemoryConfigStore implements ConfigStore { + + private final ConcurrentHashMap<String, ExtensionConfig> store = new ConcurrentHashMap<>(); + + @Override + public void put(String id, ExtensionConfig config) { + store.put(id, config); + } + + @Override + public ExtensionConfig get(String id) { + return store.get(id); + } + + @Override + public boolean containsKey(String id) { + return store.containsKey(id); + } + + @Override + public Set<String> keySet() { + return store.keySet(); + } + + @Override + public int size() { + return store.size(); + } +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java index 101de53d6..05551c112 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java @@ -67,12 +67,32 @@ public class EmitterManager extends AbstractComponentManager<Emitter, EmitterFac public static EmitterManager load(PluginManager pluginManager, TikaJsonConfig tikaJsonConfig, boolean allowRuntimeModifications) throws IOException, TikaConfigException { + return load(pluginManager, tikaJsonConfig, allowRuntimeModifications, null); + } + + /** + * Loads an EmitterManager with optional support for runtime modifications and a custom config store. + * + * @param pluginManager the plugin manager + * @param tikaJsonConfig the configuration + * @param allowRuntimeModifications if true, allows calling {@link #saveEmitter(ExtensionConfig)} + * to add emitters at runtime + * @param configStore custom config store implementation, or null to use default in-memory store + * @return an EmitterManager + */ + public static EmitterManager load(PluginManager pluginManager, TikaJsonConfig tikaJsonConfig, + boolean allowRuntimeModifications, + org.apache.tika.pipes.core.config.ConfigStore configStore) + throws IOException, TikaConfigException { EmitterManager manager = new EmitterManager(pluginManager, allowRuntimeModifications); JsonNode emittersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY); // Validate configuration and collect emitter configs without instantiating Map<String, ExtensionConfig> configs = manager.validateAndCollectConfigs(pluginManager, emittersNode); + if (configStore != null) { + return new EmitterManager(pluginManager, configs, allowRuntimeModifications, configStore); + } return new EmitterManager(pluginManager, configs, allowRuntimeModifications); } @@ -85,6 +105,12 @@ public class EmitterManager extends AbstractComponentManager<Emitter, EmitterFac super(pluginManager, emitterConfigs, allowRuntimeModifications); } + private EmitterManager(PluginManager pluginManager, Map<String, ExtensionConfig> emitterConfigs, + boolean allowRuntimeModifications, + org.apache.tika.pipes.core.config.ConfigStore configStore) { + super(pluginManager, emitterConfigs, allowRuntimeModifications, configStore); + } + @Override protected String getConfigKey() { return CONFIG_KEY; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java index af43914ac..c9ed4c3bf 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java @@ -66,12 +66,32 @@ public class FetcherManager extends AbstractComponentManager<Fetcher, FetcherFac public static FetcherManager load(PluginManager pluginManager, TikaJsonConfig tikaJsonConfig, boolean allowRuntimeModifications) throws TikaConfigException, IOException { + return load(pluginManager, tikaJsonConfig, allowRuntimeModifications, null); + } + + /** + * Loads a FetcherManager with optional support for runtime modifications and a custom config store. + * + * @param pluginManager the plugin manager + * @param tikaJsonConfig the configuration + * @param allowRuntimeModifications if true, allows calling {@link #saveFetcher(ExtensionConfig)} + * to add fetchers at runtime + * @param configStore custom config store implementation, or null to use default in-memory store + * @return a FetcherManager + */ + public static FetcherManager load(PluginManager pluginManager, TikaJsonConfig tikaJsonConfig, + boolean allowRuntimeModifications, + org.apache.tika.pipes.core.config.ConfigStore configStore) + throws TikaConfigException, IOException { FetcherManager manager = new FetcherManager(pluginManager, allowRuntimeModifications); JsonNode fetchersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY); // Validate configuration and collect fetcher configs without instantiating Map<String, ExtensionConfig> configs = manager.validateAndCollectConfigs(pluginManager, fetchersNode); + if (configStore != null) { + return new FetcherManager(pluginManager, configs, allowRuntimeModifications, configStore); + } return new FetcherManager(pluginManager, configs, allowRuntimeModifications); } @@ -84,6 +104,12 @@ public class FetcherManager extends AbstractComponentManager<Fetcher, FetcherFac super(pluginManager, fetcherConfigs, allowRuntimeModifications); } + private FetcherManager(PluginManager pluginManager, Map<String, ExtensionConfig> fetcherConfigs, + boolean allowRuntimeModifications, + org.apache.tika.pipes.core.config.ConfigStore configStore) { + super(pluginManager, fetcherConfigs, allowRuntimeModifications, configStore); + } + @Override protected String getConfigKey() { return CONFIG_KEY;
