This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4576-add-fetcher-config-store-interface
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 357bab09a703cc4fb1eff9feb42c1e52d39745dd
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Tue Dec 16 13:29:00 2025 -0600

    TIKA-4576
    Create pluggable storage interface for Fetcher components with in-memory 
implementationi
---
 .../tika/pipes/core/AbstractComponentManager.java  | 40 +++++++++----
 .../apache/tika/pipes/core/config/ConfigStore.java | 66 ++++++++++++++++++++++
 .../pipes/core/config/InMemoryConfigStore.java     | 56 ++++++++++++++++++
 .../tika/pipes/core/emitter/EmitterManager.java    | 26 +++++++++
 .../tika/pipes/core/fetcher/FetcherManager.java    | 26 +++++++++
 5 files changed, 203 insertions(+), 11 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
index 02c77d4d2..059cfc839 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.tika.config.loader.TikaObjectMapperFactory;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.core.config.ConfigStore;
+import org.apache.tika.pipes.core.config.InMemoryConfigStore;
 import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.plugins.TikaExtension;
 import org.apache.tika.plugins.TikaExtensionFactory;
@@ -50,15 +52,23 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractComponentManager.class);
 
     protected final PluginManager pluginManager;
-    private final Map<String, ExtensionConfig> componentConfigs = new 
ConcurrentHashMap<>();
+    private final ConfigStore configStore;
     private final Map<String, T> componentCache = new ConcurrentHashMap<>();
     private final boolean allowRuntimeModifications;
 
     protected AbstractComponentManager(PluginManager pluginManager,
                                       Map<String, ExtensionConfig> 
componentConfigs,
                                       boolean allowRuntimeModifications) {
+        this(pluginManager, componentConfigs, allowRuntimeModifications, new 
InMemoryConfigStore());
+    }
+
+    protected AbstractComponentManager(PluginManager pluginManager,
+                                      Map<String, ExtensionConfig> 
componentConfigs,
+                                      boolean allowRuntimeModifications,
+                                      ConfigStore configStore) {
         this.pluginManager = pluginManager;
-        this.componentConfigs.putAll(componentConfigs);
+        this.configStore = configStore;
+        componentConfigs.forEach(configStore::put);
         this.allowRuntimeModifications = allowRuntimeModifications;
     }
 
@@ -67,6 +77,14 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
      */
     protected abstract String getConfigKey();
 
+    /**
+     * Returns the config store used by this manager.
+     * Useful for subclasses that need direct access to the store.
+     */
+    protected ConfigStore getConfigStore() {
+        return configStore;
+    }
+
     /**
      * Returns the factory class for this component type.
      */
@@ -194,11 +212,11 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
         }
 
         // Check if config exists
-        ExtensionConfig config = componentConfigs.get(id);
+        ExtensionConfig config = configStore.get(id);
         if (config == null) {
             throw createNotFoundException(
                     "Can't find " + getComponentName() + " for id=" + id +
-                    ". Available: " + componentConfigs.keySet());
+                    ". Available: " + configStore.keySet());
         }
 
         // Synchronized block to ensure only one thread builds the component
@@ -267,7 +285,7 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
         String typeName = config.name();
 
         // Check for duplicate ID
-        if (componentConfigs.containsKey(componentId)) {
+        if (configStore.containsKey(componentId)) {
             throw new TikaConfigException(getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) +
                     getComponentName().substring(1) + " with id '" + 
componentId + "' already exists");
         }
@@ -281,7 +299,7 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
         }
 
         // Store config without instantiating
-        componentConfigs.put(componentId, config);
+        configStore.put(componentId, config);
         LOG.debug("Saved {} config: id={}, type={}", getComponentName(), 
componentId, typeName);
     }
 
@@ -289,7 +307,7 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
      * Returns the set of supported component IDs.
      */
     public Set<String> getSupported() {
-        return componentConfigs.keySet();
+        return configStore.keySet();
     }
 
     /**
@@ -299,15 +317,15 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
      * @return the single configured component
      */
     public T getComponent() throws IOException, TikaException {
-        if (componentConfigs.size() != 1) {
+        if (configStore.size() != 1) {
             throw new IllegalArgumentException(
                     "No-arg get" + getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) +
                     getComponentName().substring(1) + "() requires exactly 1 
configured " +
-                    getComponentName() + ". Found: " + componentConfigs.size() 
+
-                    " (" + componentConfigs.keySet() + ")");
+                    getComponentName() + ". Found: " + configStore.size() +
+                    " (" + configStore.keySet() + ")");
         }
         // Get the single component id and use getComponent(id) for lazy 
loading
-        String componentId = componentConfigs.keySet().iterator().next();
+        String componentId = configStore.keySet().iterator().next();
         return getComponent(componentId);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
new file mode 100644
index 000000000..854734783
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.util.Set;
+
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Interface for storing and retrieving component configurations.
+ * Implementations can provide different storage backends (in-memory, 
database, distributed cache, etc.).
+ */
+public interface ConfigStore {
+
+    /**
+     * Stores a configuration.
+     *
+     * @param id the configuration ID
+     * @param config the configuration to store
+     */
+    void put(String id, ExtensionConfig config);
+
+    /**
+     * Retrieves a configuration by ID.
+     *
+     * @param id the configuration ID
+     * @return the configuration, or null if not found
+     */
+    ExtensionConfig get(String id);
+
+    /**
+     * Checks if a configuration exists.
+     *
+     * @param id the configuration ID
+     * @return true if the configuration exists
+     */
+    boolean containsKey(String id);
+
+    /**
+     * Returns all configuration IDs.
+     *
+     * @return set of all configuration IDs
+     */
+    Set<String> keySet();
+
+    /**
+     * Returns the number of stored configurations.
+     *
+     * @return the number of configurations
+     */
+    int size();
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
new file mode 100644
index 000000000..e1531e4b8
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Default in-memory implementation of {@link ConfigStore} using a {@link 
ConcurrentHashMap}.
+ * Thread-safe and suitable for single-instance deployments.
+ */
+public class InMemoryConfigStore implements ConfigStore {
+
+    private final ConcurrentHashMap<String, ExtensionConfig> store = new 
ConcurrentHashMap<>();
+
+    @Override
+    public void put(String id, ExtensionConfig config) {
+        store.put(id, config);
+    }
+
+    @Override
+    public ExtensionConfig get(String id) {
+        return store.get(id);
+    }
+
+    @Override
+    public boolean containsKey(String id) {
+        return store.containsKey(id);
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return store.keySet();
+    }
+
+    @Override
+    public int size() {
+        return store.size();
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
index 101de53d6..05551c112 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
@@ -67,12 +67,32 @@ public class EmitterManager extends 
AbstractComponentManager<Emitter, EmitterFac
     public static EmitterManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig,
                                      boolean allowRuntimeModifications)
             throws IOException, TikaConfigException {
+        return load(pluginManager, tikaJsonConfig, allowRuntimeModifications, 
null);
+    }
+
+    /**
+     * Loads an EmitterManager with optional support for runtime modifications 
and a custom config store.
+     *
+     * @param pluginManager the plugin manager
+     * @param tikaJsonConfig the configuration
+     * @param allowRuntimeModifications if true, allows calling {@link 
#saveEmitter(ExtensionConfig)}
+     *                                  to add emitters at runtime
+     * @param configStore custom config store implementation, or null to use 
default in-memory store
+     * @return an EmitterManager
+     */
+    public static EmitterManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig,
+                                     boolean allowRuntimeModifications,
+                                     
org.apache.tika.pipes.core.config.ConfigStore configStore)
+            throws IOException, TikaConfigException {
         EmitterManager manager = new EmitterManager(pluginManager, 
allowRuntimeModifications);
         JsonNode emittersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
 
         // Validate configuration and collect emitter configs without 
instantiating
         Map<String, ExtensionConfig> configs = 
manager.validateAndCollectConfigs(pluginManager, emittersNode);
 
+        if (configStore != null) {
+            return new EmitterManager(pluginManager, configs, 
allowRuntimeModifications, configStore);
+        }
         return new EmitterManager(pluginManager, configs, 
allowRuntimeModifications);
     }
 
@@ -85,6 +105,12 @@ public class EmitterManager extends 
AbstractComponentManager<Emitter, EmitterFac
         super(pluginManager, emitterConfigs, allowRuntimeModifications);
     }
 
+    private EmitterManager(PluginManager pluginManager, Map<String, 
ExtensionConfig> emitterConfigs,
+                          boolean allowRuntimeModifications,
+                          org.apache.tika.pipes.core.config.ConfigStore 
configStore) {
+        super(pluginManager, emitterConfigs, allowRuntimeModifications, 
configStore);
+    }
+
     @Override
     protected String getConfigKey() {
         return CONFIG_KEY;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
index af43914ac..c9ed4c3bf 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
@@ -66,12 +66,32 @@ public class FetcherManager extends 
AbstractComponentManager<Fetcher, FetcherFac
     public static FetcherManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig,
                                      boolean allowRuntimeModifications)
             throws TikaConfigException, IOException {
+        return load(pluginManager, tikaJsonConfig, allowRuntimeModifications, 
null);
+    }
+
+    /**
+     * Loads a FetcherManager with optional support for runtime modifications 
and a custom config store.
+     *
+     * @param pluginManager the plugin manager
+     * @param tikaJsonConfig the configuration
+     * @param allowRuntimeModifications if true, allows calling {@link 
#saveFetcher(ExtensionConfig)}
+     *                                  to add fetchers at runtime
+     * @param configStore custom config store implementation, or null to use 
default in-memory store
+     * @return a FetcherManager
+     */
+    public static FetcherManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig,
+                                     boolean allowRuntimeModifications,
+                                     
org.apache.tika.pipes.core.config.ConfigStore configStore)
+            throws TikaConfigException, IOException {
         FetcherManager manager = new FetcherManager(pluginManager, 
allowRuntimeModifications);
         JsonNode fetchersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
 
         // Validate configuration and collect fetcher configs without 
instantiating
         Map<String, ExtensionConfig> configs = 
manager.validateAndCollectConfigs(pluginManager, fetchersNode);
 
+        if (configStore != null) {
+            return new FetcherManager(pluginManager, configs, 
allowRuntimeModifications, configStore);
+        }
         return new FetcherManager(pluginManager, configs, 
allowRuntimeModifications);
     }
 
@@ -84,6 +104,12 @@ public class FetcherManager extends 
AbstractComponentManager<Fetcher, FetcherFac
         super(pluginManager, fetcherConfigs, allowRuntimeModifications);
     }
 
+    private FetcherManager(PluginManager pluginManager, Map<String, 
ExtensionConfig> fetcherConfigs,
+                          boolean allowRuntimeModifications,
+                          org.apache.tika.pipes.core.config.ConfigStore 
configStore) {
+        super(pluginManager, fetcherConfigs, allowRuntimeModifications, 
configStore);
+    }
+
     @Override
     protected String getConfigKey() {
         return CONFIG_KEY;

Reply via email to