This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 612fb428b TIKA-4558 -- add lazy loading to FetcherManager (#2426)
612fb428b is described below

commit 612fb428b1c00428870a776377cf260cecb19fbe
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 9 09:16:05 2025 -0500

    TIKA-4558 -- add lazy loading to FetcherManager (#2426)
---
 .../org/apache/tika/pipes/api/PipesResult.java     |   7 +-
 .../api/emitter/EmitterNotFoundException.java      |  33 ++
 .../api/fetcher/FetcherNotFoundException.java      |  33 ++
 .../tika/pipes/core/AbstractComponentManager.java  | 294 ++++++++++++
 .../apache/tika/pipes/core/async/AsyncEmitter.java |   9 +-
 .../tika/pipes/core/emitter/EmitterManager.java    | 137 ++++--
 .../EmittingEmbeddedDocumentBytesHandler.java      |   3 +-
 .../tika/pipes/core/fetcher/FetcherManager.java    | 139 ++++--
 .../apache/tika/pipes/core/server/EmitHandler.java |   5 +-
 .../apache/tika/pipes/core/server/PipesWorker.java |  12 +-
 .../apache/tika/pipes/core/PipesClientTest.java    |   5 +-
 .../apache/tika/pipes/core/PluginsTestHelper.java  |  11 +
 .../pipes/core/emitter/EmitterManagerTest.java     | 516 +++++++++++++++++++++
 .../pipes/core/fetcher/FetcherManagerTest.java     | 495 ++++++++++++++++++++
 .../tika/pipes/emitter/fs/FileSystemEmitter.java   |   4 +-
 .../pipes/reporter/jdbc/TestJDBCPipesReporter.java |   2 +-
 16 files changed, 1608 insertions(+), 97 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
index 86d84ad5b..9418bac0b 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
@@ -77,10 +77,13 @@ public record PipesResult(RESULT_STATUS status, EmitData 
emitData, String messag
         // Emit failure
         EMIT_EXCEPTION(CATEGORY.APPLICATION_ERROR),
 
+        // Emitter failures
+        EMITTER_INITIALIZATION_EXCEPTION(CATEGORY.APPLICATION_ERROR),
+        EMITTER_NOT_FOUND(CATEGORY.APPLICATION_ERROR),
+
         // Other errors
         INTERRUPTED_EXCEPTION(CATEGORY.APPLICATION_ERROR),
-        FETCHER_NOT_FOUND(CATEGORY.APPLICATION_ERROR),
-        EMITTER_NOT_FOUND(CATEGORY.APPLICATION_ERROR);
+        FETCHER_NOT_FOUND(CATEGORY.APPLICATION_ERROR);
 
 
         private final CATEGORY category;
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterNotFoundException.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterNotFoundException.java
new file mode 100644
index 000000000..5f063d249
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterNotFoundException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.emitter;
+
+import org.apache.tika.exception.TikaException;
+
+/**
+ * Exception thrown when a requested emitter configuration does not exist.
+ */
+public class EmitterNotFoundException extends TikaException {
+
+    public EmitterNotFoundException(String msg) {
+        super(msg);
+    }
+
+    public EmitterNotFoundException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherNotFoundException.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherNotFoundException.java
new file mode 100644
index 000000000..d05d4335c
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherNotFoundException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.fetcher;
+
+import org.apache.tika.exception.TikaException;
+
+/**
+ * Exception thrown when a requested fetcher configuration does not exist.
+ */
+public class FetcherNotFoundException extends TikaException {
+
+    public FetcherNotFoundException(String msg) {
+        super(msg);
+    }
+
+    public FetcherNotFoundException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
new file mode 100644
index 000000000..7dab511e0
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.pf4j.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.loader.PolymorphicObjectMapperFactory;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaExtension;
+import org.apache.tika.plugins.TikaExtensionFactory;
+
+/**
+ * Abstract base class for managing Tika components (Fetchers, Emitters, etc.).
+ * Provides lazy instantiation, early validation, and optional runtime 
modifications.
+ *
+ * @param <T> the component type (e.g., Fetcher, Emitter)
+ * @param <F> the factory type for creating components
+ */
+public abstract class AbstractComponentManager<T extends TikaExtension,
+                                                F extends 
TikaExtensionFactory<T>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractComponentManager.class);
+
+    protected final PluginManager pluginManager;
+    private final Map<String, ExtensionConfig> componentConfigs = new 
ConcurrentHashMap<>();
+    private final Map<String, T> componentCache = new ConcurrentHashMap<>();
+    private final boolean allowRuntimeModifications;
+
+    protected AbstractComponentManager(PluginManager pluginManager,
+                                      Map<String, ExtensionConfig> 
componentConfigs,
+                                      boolean allowRuntimeModifications) {
+        this.pluginManager = pluginManager;
+        this.componentConfigs.putAll(componentConfigs);
+        this.allowRuntimeModifications = allowRuntimeModifications;
+    }
+
+    /**
+     * Returns the JSON configuration key for this component type (e.g., 
"fetchers", "emitters").
+     */
+    protected abstract String getConfigKey();
+
+    /**
+     * Returns the factory class for this component type.
+     */
+    protected abstract Class<F> getFactoryClass();
+
+    /**
+     * Returns the component name for error messages (e.g., "fetcher", 
"emitter").
+     */
+    protected abstract String getComponentName();
+
+    /**
+     * Creates a not-found exception for this component type.
+     */
+    protected abstract TikaException createNotFoundException(String message);
+
+    /**
+     * Validates the configuration and collects component configs without 
instantiating.
+     */
+    protected Map<String, ExtensionConfig> validateAndCollectConfigs(
+            PluginManager pluginManager, JsonNode configNode) throws 
TikaConfigException, IOException {
+
+        Map<String, F> factories = getFactories(pluginManager);
+        Map<String, ExtensionConfig> configs = new HashMap<>();
+
+        if (configNode != null && !configNode.isNull()) {
+            // Outer loop: iterate over type names
+            Iterator<Map.Entry<String, JsonNode>> typeFields = 
configNode.fields();
+            while (typeFields.hasNext()) {
+                Map.Entry<String, JsonNode> typeEntry = typeFields.next();
+                String typeName = typeEntry.getKey();
+                JsonNode instancesNode = typeEntry.getValue();
+
+                // Validate that factory exists
+                F factory = factories.get(typeName);
+                if (factory == null) {
+                    throw new TikaConfigException(
+                            "Unknown " + getComponentName() + " type: " + 
typeName +
+                            ". Available: " + factories.keySet());
+                }
+
+                // Inner loop: iterate over instances of this type
+                Iterator<Map.Entry<String, JsonNode>> instanceFields = 
instancesNode.fields();
+                while (instanceFields.hasNext()) {
+                    Map.Entry<String, JsonNode> instanceEntry = 
instanceFields.next();
+                    String instanceId = instanceEntry.getKey();
+                    JsonNode config = instanceEntry.getValue();
+
+                    if (configs.containsKey(instanceId)) {
+                        throw new TikaConfigException("Duplicate " + 
getComponentName() +
+                                " id: " + instanceId);
+                    }
+
+                    configs.put(instanceId, new ExtensionConfig(instanceId, 
typeName,
+                            toJsonString(config)));
+                }
+            }
+        }
+
+        return configs;
+    }
+
+    protected Map<String, F> getFactories(PluginManager pluginManager) throws 
TikaConfigException {
+        if (pluginManager.getStartedPlugins().isEmpty()) {
+            pluginManager.loadPlugins();
+            pluginManager.startPlugins();
+        }
+
+        Map<String, F> factories = new HashMap<>();
+        for (F factory : pluginManager.getExtensions(getFactoryClass())) {
+            String name = factory.getName();
+            ClassLoader cl = factory.getClass().getClassLoader();
+            boolean isFromPlugin = cl instanceof org.pf4j.PluginClassLoader;
+
+            F existing = factories.get(name);
+            if (existing != null) {
+                boolean existingIsFromPlugin = 
existing.getClass().getClassLoader()
+                        instanceof org.pf4j.PluginClassLoader;
+                if (isFromPlugin && !existingIsFromPlugin) {
+                    // Replace classpath version with plugin version
+                    factories.put(name, factory);
+                }
+                // Otherwise skip duplicate (keep existing)
+                continue;
+            }
+            factories.put(name, factory);
+        }
+        return factories;
+    }
+
+    private static String toJsonString(final JsonNode node) throws 
TikaConfigException {
+        try {
+            return 
PolymorphicObjectMapperFactory.getMapper().writeValueAsString(node);
+        } catch (JsonProcessingException e) {
+            throw new TikaConfigException("Failed to serialize config to JSON 
string", e);
+        }
+    }
+
+    /**
+     * Gets a component by ID, lazily instantiating it if needed.
+     */
+    public T getComponent(String id) throws IOException, TikaException {
+        // Check cache first (fast path, no synchronization)
+        T component = componentCache.get(id);
+        if (component != null) {
+            return component;
+        }
+
+        // Check if config exists
+        ExtensionConfig config = componentConfigs.get(id);
+        if (config == null) {
+            throw createNotFoundException(
+                    "Can't find " + getComponentName() + " for id=" + id +
+                    ". Available: " + componentConfigs.keySet());
+        }
+
+        // Synchronized block to ensure only one thread builds the component
+        synchronized (this) {
+            // Double-check in case another thread built it while we were 
waiting
+            component = componentCache.get(id);
+            if (component != null) {
+                return component;
+            }
+
+            // Build the component
+            try {
+                component = buildComponent(config);
+                componentCache.put(id, component);
+                LOG.debug("Lazily instantiated {}: {}", getComponentName(), 
id);
+                return component;
+            } catch (TikaConfigException e) {
+                throw new IOException("Failed to build " + getComponentName() 
+ ": " + id, e);
+            }
+        }
+    }
+
+    /**
+     * Builds a component instance from its configuration.
+     */
+    private T buildComponent(ExtensionConfig config) throws 
TikaConfigException, IOException {
+        Map<String, F> factories = getFactories(pluginManager);
+        F factory = factories.get(config.name());
+
+        if (factory == null) {
+            // This shouldn't happen since we validated in load(), but check 
anyway
+            throw new TikaConfigException(
+                    "Unknown " + getComponentName() + " type: " + 
config.name() +
+                    ". Available: " + factories.keySet());
+        }
+
+        return factory.buildExtension(config);
+    }
+
+    /**
+     * Dynamically adds a component configuration at runtime.
+     * The component will not be instantiated until it is first requested via 
{@link #getComponent(String)}.
+     * <p>
+     * This method is only available if the manager was loaded with 
allowRuntimeModifications=true.
+     * <p>
+     * Only authorized/authenticated users should be allowed to modify 
components. BE CAREFUL.
+     *
+     * @param config the extension configuration for the component
+     * @throws TikaConfigException if the component type is unknown, if a 
component with the same ID already exists,
+     *         or if runtime modifications are not allowed
+     * @throws IOException if there is an error accessing the plugin manager
+     */
+    public synchronized void saveComponent(ExtensionConfig config) throws 
TikaConfigException, IOException {
+        if (!allowRuntimeModifications) {
+            throw new TikaConfigException(
+                    "Runtime modifications are not allowed. " + 
getClass().getSimpleName() +
+                    " must be loaded with allowRuntimeModifications=true to 
use save" +
+                    getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
+        }
+
+        if (config == null) {
+            throw new IllegalArgumentException("ExtensionConfig cannot be 
null");
+        }
+
+        String componentId = config.id();
+        String typeName = config.name();
+
+        // Check for duplicate ID
+        if (componentConfigs.containsKey(componentId)) {
+            throw new TikaConfigException(getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) +
+                    getComponentName().substring(1) + " with id '" + 
componentId + "' already exists");
+        }
+
+        // Validate that factory exists for this type
+        Map<String, F> factories = getFactories(pluginManager);
+        if (!factories.containsKey(typeName)) {
+            throw new TikaConfigException(
+                    "Unknown " + getComponentName() + " type: " + typeName +
+                    ". Available: " + factories.keySet());
+        }
+
+        // Store config without instantiating
+        componentConfigs.put(componentId, config);
+        LOG.debug("Saved {} config: id={}, type={}", getComponentName(), 
componentId, typeName);
+    }
+
+    /**
+     * Returns the set of supported component IDs.
+     */
+    public Set<String> getSupported() {
+        return componentConfigs.keySet();
+    }
+
+    /**
+     * Convenience method that returns a component if only one component
+     * is configured. If 0 or > 1 components are configured, this throws an 
IllegalArgumentException.
+     *
+     * @return the single configured component
+     */
+    public T getComponent() throws IOException, TikaException {
+        if (componentConfigs.size() != 1) {
+            throw new IllegalArgumentException(
+                    "No-arg get" + getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) +
+                    getComponentName().substring(1) + "() requires exactly 1 
configured " +
+                    getComponentName() + ". Found: " + componentConfigs.size() 
+
+                    " (" + componentConfigs.keySet() + ")");
+        }
+        // Get the single component id and use getComponent(id) for lazy 
loading
+        String componentId = componentConfigs.keySet().iterator().next();
+        return getComponent(componentId);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
index ea084aa65..136bda109 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.pipes.api.emitter.Emitter;
 import org.apache.tika.pipes.core.PipesConfig;
@@ -119,7 +120,13 @@ public class AsyncEmitter implements Callable<Integer> {
             int emitted = 0;
             LOG.debug("about to emit {} files, {} estimated bytes", size, 
estimatedSize);
             for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
-                Emitter emitter = emitterManager.getEmitter(e.getKey());
+                Emitter emitter = null;
+                try {
+                    emitter = emitterManager.getEmitter(e.getKey());
+                } catch (IOException | TikaException ex) {
+                    LOG.warn("emitter id={} failed on instantiation", 
e.getKey(), ex);
+                    return;
+                }
                 tryToEmit(emitter, e.getValue());
                 emitted += e.getValue().size();
             }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
index ee5ac55a4..101de53d6 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
@@ -18,79 +18,134 @@ package org.apache.tika.pipes.core.emitter;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import org.pf4j.PluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.tika.config.loader.TikaJsonConfig;
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.api.emitter.Emitter;
 import org.apache.tika.pipes.api.emitter.EmitterFactory;
-import org.apache.tika.plugins.PluginComponentLoader;
+import org.apache.tika.pipes.api.emitter.EmitterNotFoundException;
+import org.apache.tika.pipes.core.AbstractComponentManager;
+import org.apache.tika.plugins.ExtensionConfig;
 
 /**
  * Utility class that will apply the appropriate emitter
  * to the emitterString based on the prefix.
  * <p>
  * This does not allow multiple emitters supporting the same prefix.
+ * Emitters are instantiated lazily on first use.
  */
-public class EmitterManager {
-    public static final String CONFIG_KEY = "emitters";
+public class EmitterManager extends AbstractComponentManager<Emitter, 
EmitterFactory> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(EmitterManager.class);
+    private static final String CONFIG_KEY = "emitters";
 
-    private final Map<String, Emitter> emitterMap = new ConcurrentHashMap<>();
+    /**
+     * Loads an EmitterManager without allowing runtime modifications.
+     * Use {@link #load(PluginManager, TikaJsonConfig, boolean)} to enable 
runtime emitter additions.
+     *
+     * @param pluginManager the plugin manager
+     * @param tikaJsonConfig the configuration
+     * @return an EmitterManager that does not allow runtime modifications
+     */
+    public static EmitterManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig)
+            throws IOException, TikaConfigException {
+        return load(pluginManager, tikaJsonConfig, false);
+    }
 
-    public static EmitterManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig) throws IOException, TikaConfigException {
-        JsonNode fetchersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
-        Map<String, Emitter> fetchers =
-                PluginComponentLoader.loadInstances(pluginManager, 
EmitterFactory.class, fetchersNode);
-        return new EmitterManager(fetchers);
+    /**
+     * Loads an EmitterManager with optional support for runtime modifications.
+     *
+     * @param pluginManager the plugin manager
+     * @param tikaJsonConfig the configuration
+     * @param allowRuntimeModifications if true, allows calling {@link 
#saveEmitter(ExtensionConfig)}
+     *                                  to add emitters at runtime
+     * @return an EmitterManager
+     */
+    public static EmitterManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig,
+                                     boolean allowRuntimeModifications)
+            throws IOException, TikaConfigException {
+        EmitterManager manager = new EmitterManager(pluginManager, 
allowRuntimeModifications);
+        JsonNode emittersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
+
+        // Validate configuration and collect emitter configs without 
instantiating
+        Map<String, ExtensionConfig> configs = 
manager.validateAndCollectConfigs(pluginManager, emittersNode);
+
+        return new EmitterManager(pluginManager, configs, 
allowRuntimeModifications);
     }
 
-    private EmitterManager() {
+    private EmitterManager(PluginManager pluginManager, boolean 
allowRuntimeModifications) {
+        super(pluginManager, Map.of(), allowRuntimeModifications);
+    }
 
+    private EmitterManager(PluginManager pluginManager, Map<String, 
ExtensionConfig> emitterConfigs,
+                          boolean allowRuntimeModifications) {
+        super(pluginManager, emitterConfigs, allowRuntimeModifications);
     }
 
-    private EmitterManager(Map<String, Emitter> emitters) {
-        emitterMap.putAll(emitters);
+    @Override
+    protected String getConfigKey() {
+        return CONFIG_KEY;
     }
 
-    public Set<String> getSupported() {
-        return emitterMap.keySet();
+    @Override
+    protected Class<EmitterFactory> getFactoryClass() {
+        return EmitterFactory.class;
     }
 
+    @Override
+    protected String getComponentName() {
+        return "emitter";
+    }
 
-    public Emitter getEmitter(String emitterName) {
-        Emitter emitter = emitterMap.get(emitterName);
-        if (emitter == null) {
-            throw new IllegalArgumentException("Can't find emitter for prefix: 
" + emitterName);
-        }
-        return emitter;
+    @Override
+    protected TikaException createNotFoundException(String message) {
+        return new EmitterNotFoundException(message);
+    }
+
+    /**
+     * Gets an emitter by ID, lazily instantiating it if needed.
+     *
+     * @param emitterName the emitter ID
+     * @return the emitter
+     * @throws EmitterNotFoundException if no emitter with the given ID exists
+     * @throws IOException if there's an error building the emitter
+     * @throws TikaException if there's a configuration error
+     */
+    public Emitter getEmitter(String emitterName) throws IOException, 
TikaException {
+        return getComponent(emitterName);
     }
 
     /**
      * Convenience method that returns an emitter if only one emitter
-     * is specified in the tika-config file.  If 0 or > 1 emitters
-     * are specified, this throws an IllegalArgumentException.
-     * @return
+     * is configured. If 0 or > 1 emitters are configured, this throws an 
IllegalArgumentException.
+     *
+     * @return the single configured emitter
+     * @throws IOException if there's an error building the emitter
+     * @throws TikaException if there's a configuration error
+     */
+    public Emitter getEmitter() throws IOException, TikaException {
+        return getComponent();
+    }
+
+    /**
+     * Dynamically adds an emitter configuration at runtime.
+     * The emitter will not be instantiated until it is first requested via 
{@link #getEmitter(String)}.
+     * This allows for dynamic configuration without the overhead of immediate 
instantiation.
+     * <p>
+     * This method is only available if the EmitterManager was loaded with
+     * {@link #load(PluginManager, TikaJsonConfig, boolean)} with 
allowRuntimeModifications=true.
+     * <p>
+     * Only authorized/authenticated users should be allowed to modify 
emitters. BE CAREFUL.
+     *
+     * @param config the extension configuration for the emitter
+     * @throws TikaConfigException if the emitter type is unknown, if an 
emitter with the same ID
+     *                             already exists, or if runtime modifications 
are not allowed
+     * @throws IOException if there is an error accessing the plugin manager
      */
-    public Emitter getEmitter() {
-        if (emitterMap.isEmpty()) {
-            throw new IllegalArgumentException("emitters size must == 1 for 
the no arg call");
-        }
-        if (emitterMap.size() > 1) {
-            throw new IllegalArgumentException("need to specify 'emitterId' if 
> 1 emitters are" +
-                    " available");
-        }
-        for (Emitter emitter : emitterMap.values()) {
-            return emitter;
-        }
-        //this should be unreachable?!
-        throw new IllegalArgumentException("emitters size must == 0");
+    public void saveEmitter(ExtensionConfig config) throws 
TikaConfigException, IOException {
+        saveComponent(config);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
index f1ead0b51..5d74c49ef 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.FetchEmitTuple;
@@ -39,7 +40,7 @@ public class EmittingEmbeddedDocumentBytesHandler extends 
AbstractEmbeddedDocume
     private static final ParseContext PARSE_CONTEXT = new ParseContext();
 
     public EmittingEmbeddedDocumentBytesHandler(FetchEmitTuple fetchEmitTuple,
-                                                EmitterManager emitterManager) 
throws TikaConfigException {
+                                                EmitterManager emitterManager) 
throws TikaException, IOException {
 
         this.containerEmitKey = fetchEmitTuple.getEmitKey();
         this.embeddedDocumentBytesConfig = 
fetchEmitTuple.getParseContext().get(EmbeddedDocumentBytesConfig.class);
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
index e30a833fa..af43914ac 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
@@ -18,78 +18,133 @@ package org.apache.tika.pipes.core.fetcher;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import org.pf4j.PluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.tika.config.loader.TikaJsonConfig;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
 import org.apache.tika.pipes.api.fetcher.FetcherFactory;
-import org.apache.tika.plugins.PluginComponentLoader;
+import org.apache.tika.pipes.api.fetcher.FetcherNotFoundException;
+import org.apache.tika.pipes.core.AbstractComponentManager;
+import org.apache.tika.plugins.ExtensionConfig;
 
 /**
  * Utility class to hold multiple fetchers.
  * <p>
- * This forbids multiple fetchers with the same pluginId
+ * This forbids multiple fetchers with the same pluginId.
+ * Fetchers are instantiated lazily on first use.
  */
-public class FetcherManager {
+public class FetcherManager extends AbstractComponentManager<Fetcher, 
FetcherFactory> {
 
-    public static final String CONFIG_KEY = "fetchers";
-    private static final Logger LOG = 
LoggerFactory.getLogger(FetcherManager.class);
+    private static final String CONFIG_KEY = "fetchers";
 
+    /**
+     * Loads a FetcherManager without allowing runtime modifications.
+     * Use {@link #load(PluginManager, TikaJsonConfig, boolean)} to enable 
runtime fetcher additions.
+     *
+     * @param pluginManager the plugin manager
+     * @param tikaJsonConfig the configuration
+     * @return a FetcherManager that does not allow runtime modifications
+     */
+    public static FetcherManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig)
+            throws TikaConfigException, IOException {
+        return load(pluginManager, tikaJsonConfig, false);
+    }
 
-    public static FetcherManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig) throws TikaConfigException, IOException {
+    /**
+     * Loads a FetcherManager with optional support for runtime modifications.
+     *
+     * @param pluginManager the plugin manager
+     * @param tikaJsonConfig the configuration
+     * @param allowRuntimeModifications if true, allows calling {@link 
#saveFetcher(ExtensionConfig)}
+     *                                  to add fetchers at runtime
+     * @return a FetcherManager
+     */
+    public static FetcherManager load(PluginManager pluginManager, 
TikaJsonConfig tikaJsonConfig,
+                                     boolean allowRuntimeModifications)
+            throws TikaConfigException, IOException {
+        FetcherManager manager = new FetcherManager(pluginManager, 
allowRuntimeModifications);
         JsonNode fetchersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
-        Map<String, Fetcher> fetchers =
-                PluginComponentLoader.loadInstances(pluginManager, 
FetcherFactory.class, fetchersNode);
-        return new FetcherManager(fetchers);
+
+        // Validate configuration and collect fetcher configs without 
instantiating
+        Map<String, ExtensionConfig> configs = 
manager.validateAndCollectConfigs(pluginManager, fetchersNode);
+
+        return new FetcherManager(pluginManager, configs, 
allowRuntimeModifications);
     }
 
-    private final Map<String, Fetcher> fetcherMap = new ConcurrentHashMap<>();
+    private FetcherManager(PluginManager pluginManager, boolean 
allowRuntimeModifications) {
+        super(pluginManager, Map.of(), allowRuntimeModifications);
+    }
 
-    private FetcherManager(Map<String, Fetcher> fetcherMap) throws 
TikaConfigException {
-        this.fetcherMap.putAll(fetcherMap);
+    private FetcherManager(PluginManager pluginManager, Map<String, 
ExtensionConfig> fetcherConfigs,
+                          boolean allowRuntimeModifications) {
+        super(pluginManager, fetcherConfigs, allowRuntimeModifications);
     }
 
+    @Override
+    protected String getConfigKey() {
+        return CONFIG_KEY;
+    }
 
-    public Fetcher getFetcher(String id) throws IOException, TikaException {
-        Fetcher fetcher = fetcherMap.get(id);
-        if (fetcher == null) {
-            throw new IllegalArgumentException(
-                    "Can't find fetcher for id=" + id + ". I've loaded: " +
-                            fetcherMap.keySet());
-        }
-        return fetcher;
+    @Override
+    protected Class<FetcherFactory> getFactoryClass() {
+        return FetcherFactory.class;
     }
 
-    public Set<String> getSupported() {
-        return fetcherMap.keySet();
+    @Override
+    protected String getComponentName() {
+        return "fetcher";
+    }
+
+    @Override
+    protected TikaException createNotFoundException(String message) {
+        return new FetcherNotFoundException(message);
+    }
+
+    /**
+     * Gets a fetcher by ID, lazily instantiating it if needed.
+     *
+     * @param id the fetcher ID
+     * @return the fetcher
+     * @throws FetcherNotFoundException if no fetcher with the given ID exists
+     * @throws IOException if there's an error building the fetcher
+     * @throws TikaException if there's a configuration error
+     */
+    public Fetcher getFetcher(String id) throws IOException, TikaException {
+        return getComponent(id);
     }
 
     /**
      * Convenience method that returns a fetcher if only one fetcher
-     * is specified in the tika-config file.  If 0 or > 1 fetchers
-     * are specified, this throws an IllegalArgumentException.
-     * @return
+     * is configured. If 0 or > 1 fetchers are configured, this throws an 
IllegalArgumentException.
+     *
+     * @return the single configured fetcher
+     * @throws IOException if there's an error building the fetcher
+     * @throws TikaException if there's a configuration error
+     */
+    public Fetcher getFetcher() throws IOException, TikaException {
+        return getComponent();
+    }
+
+    /**
+     * Dynamically adds a fetcher configuration at runtime.
+     * The fetcher will not be instantiated until it is first requested via 
{@link #getFetcher(String)}.
+     * This allows for dynamic configuration without the overhead of immediate 
instantiation.
+     * <p>
+     * This method is only available if the FetcherManager was loaded with
+     * {@link #load(PluginManager, TikaJsonConfig, boolean)} with 
allowRuntimeModifications=true.
+     * <p>
+     * Only authorized/authenticated users should be allowed to modify 
fetchers. BE CAREFUL.
+     *
+     * @param config the extension configuration for the fetcher
+     * @throws TikaConfigException if the fetcher type is unknown, if a 
fetcher with the same ID
+     *                             already exists, or if runtime modifications 
are not allowed
+     * @throws IOException if there is an error accessing the plugin manager
      */
-    public Fetcher getFetcher() {
-        if (fetcherMap.isEmpty()) {
-            throw new IllegalArgumentException("fetchers size must == 1 for 
the no arg call");
-        }
-        if (fetcherMap.size() > 1) {
-            throw new IllegalArgumentException("need to specify 'fetcherId' if 
> 1 fetchers are" +
-                    " available");
-        }
-        for (Fetcher fetcher : fetcherMap.values()) {
-            return fetcher;
-        }
-        //this should be unreachable?!
-        throw new IllegalArgumentException("fetchers size must == 0");
+    public void saveFetcher(ExtensionConfig config) throws 
TikaConfigException, IOException {
+        saveComponent(config);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
index 37c504ff4..687269b2c 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
@@ -99,10 +99,13 @@ class EmitHandler {
 
         try {
             emitter = emitterManager.getEmitter(emitKey.getEmitterId());
-        } catch (IllegalArgumentException e) {
+        } catch (org.apache.tika.pipes.api.emitter.EmitterNotFoundException e) 
{
             String noEmitterMsg = getNoEmitterMsg(taskId);
             LOG.warn(noEmitterMsg);
             return new 
PipesResult(PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND, noEmitterMsg);
+        } catch (IOException | TikaException e) {
+            LOG.warn("Couldn't initialize emitter for task id '" + taskId + 
"'", e);
+            return new 
PipesResult(PipesResult.RESULT_STATUS.EMITTER_INITIALIZATION_EXCEPTION, 
ExceptionUtils.getStackTrace(e));
         }
         try {
             if (isExtractEmbeddedBytes &&
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
index 61fa43c68..daf364990 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
@@ -131,7 +131,14 @@ class PipesWorker implements Callable<PipesResult> {
             return new ParseDataOrPipesResult(null, tisOrResult.pipesResult());
         }
 
-        ParseContext parseContext = setupParseContext(fetchEmitTuple);
+        ParseContext parseContext = null;
+        try {
+            parseContext = setupParseContext(fetchEmitTuple);
+        } catch (IOException e) {
+            LOG.warn("fetcher initialization exception id={}", 
fetchEmitTuple.getId(), e);
+            return new ParseDataOrPipesResult(null,
+                    new 
PipesResult(PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION, 
ExceptionUtils.getStackTrace(e)));
+        }
         try (TikaInputStream tis = tisOrResult.tis()) {
             return parseHandler.parseWithStream(fetchEmitTuple, tis, metadata, 
parseContext);
         } catch (SecurityException e) {
@@ -146,8 +153,7 @@ class PipesWorker implements Callable<PipesResult> {
 
 
 
-    private ParseContext setupParseContext(FetchEmitTuple fetchEmitTuple)
-            throws TikaConfigException {
+    private ParseContext setupParseContext(FetchEmitTuple fetchEmitTuple) 
throws TikaException, IOException {
         ParseContext parseContext = fetchEmitTuple.getParseContext();
         if (parseContext.get(HandlerConfig.class) == null) {
             parseContext.set(HandlerConfig.class, DEFAULT_HANDLER_CONFIG);
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 74e6e1a78..b482200e1 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -490,9 +490,8 @@ public class PipesClientTest {
 
             PipesResult pipesResult = pipesClient.process(tuple);
 
-            // Should be FETCHER_NOT_FOUND
-            assertEquals(PipesResult.RESULT_STATUS.FETCHER_NOT_FOUND, 
pipesResult.status(),
-                    "Should return FETCHER_NOT_FOUND when fetcher name is 
invalid");
+            
assertEquals(PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION, 
pipesResult.status(),
+                    "Should return FETCHER_INITIALIZATION_EXCEPTION when 
fetcher name is invalid");
 
             // Verify it's categorized as APPLICATION_ERROR
             assertTrue(pipesResult.isApplicationError(),
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
index 834a389be..5e8676094 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
@@ -84,4 +84,15 @@ public class PluginsTestHelper {
             
Files.copy(PipesServerTest.class.getResourceAsStream("/test-documents/" + 
testDoc), inputDir.resolve(testDoc));
         }
     }
+
+    /**
+     * Converts a Path to a JSON-safe string with forward slashes.
+     * This ensures paths work correctly in JSON configs on both Windows and 
Unix systems.
+     *
+     * @param path the path to convert
+     * @return a string representation with forward slashes
+     */
+    public static String toJsonPath(Path path) {
+        return path.toString().replace("\\", "/");
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
new file mode 100644
index 000000000..8d98865c9
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.emitter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.emitter.Emitter;
+import org.apache.tika.pipes.api.emitter.EmitterNotFoundException;
+import org.apache.tika.pipes.core.PluginsTestHelper;
+import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaPluginManager;
+
+public class EmitterManagerTest {
+
+    @Test
+    public void testBasicLoad(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        assertNotNull(emitterManager);
+        assertEquals(1, emitterManager.getSupported().size());
+        assertTrue(emitterManager.getSupported().contains("fse"));
+    }
+
+    @Test
+    public void testEmptyConfig(@TempDir Path tmpDir) throws Exception {
+        // Create config with no emitters
+        String configJson = """
+                {
+                  "plugin-roots": "target/plugins"
+                }
+                """;
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        assertNotNull(emitterManager);
+        assertEquals(0, emitterManager.getSupported().size());
+
+        // Try to get an emitter when none are configured
+        EmitterNotFoundException exception = 
assertThrows(EmitterNotFoundException.class, () -> {
+            emitterManager.getEmitter("any-id");
+        });
+
+        assertTrue(exception.getMessage().contains("any-id"));
+        assertTrue(exception.getMessage().contains("Available: []"));
+    }
+
+    @Test
+    public void testLazyInstantiation(@TempDir Path tmpDir) throws Exception {
+        // Create config with multiple emitters
+        String configJson = String.format(Locale.ROOT, """
+                {
+                  "emitters": {
+                    "file-system-emitter": {
+                      "fse1": {
+                        "basePath": "%s",
+                        "onExists": "REPLACE"
+                      },
+                      "fse2": {
+                        "basePath": "%s",
+                        "onExists": "REPLACE"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output1")),
+                     PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        // After load, both emitters should be in supported list but not 
instantiated yet
+        assertEquals(2, emitterManager.getSupported().size());
+
+        // Request only fse1 - only it should be instantiated
+        Emitter emitter1 = emitterManager.getEmitter("fse1");
+        assertNotNull(emitter1);
+        assertEquals("fse1", emitter1.getExtensionConfig().id());
+
+        // fse2 has not been requested yet - verify it exists in config
+        assertTrue(emitterManager.getSupported().contains("fse2"));
+
+        // Now request fse2
+        Emitter emitter2 = emitterManager.getEmitter("fse2");
+        assertNotNull(emitter2);
+        assertEquals("fse2", emitter2.getExtensionConfig().id());
+    }
+
+    @Test
+    public void testCaching(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        // Get the same emitter multiple times
+        Emitter emitter1 = emitterManager.getEmitter("fse");
+        Emitter emitter2 = emitterManager.getEmitter("fse");
+        Emitter emitter3 = emitterManager.getEmitter("fse");
+
+        // Should be the exact same instance (reference equality)
+        assertSame(emitter1, emitter2);
+        assertSame(emitter2, emitter3);
+    }
+
+    @Test
+    public void testThreadSafety(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        int threadCount = 10;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+        List<Future<Emitter>> futures = new ArrayList<>();
+
+        // Start multiple threads that all request the same emitter 
simultaneously
+        for (int i = 0; i < threadCount; i++) {
+            futures.add(executor.submit(() -> {
+                try {
+                    // Wait for all threads to be ready
+                    startLatch.await();
+
+                    // All threads try to get the emitter at once
+                    return emitterManager.getEmitter("fse");
+                } finally {
+                    doneLatch.countDown();
+                }
+            }));
+        }
+
+        // Start all threads at once
+        startLatch.countDown();
+
+        // Wait for all threads to complete
+        assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
+
+        // Collect all emitters
+        List<Emitter> emitters = new ArrayList<>();
+        for (Future<Emitter> future : futures) {
+            emitters.add(future.get());
+        }
+
+        executor.shutdown();
+
+        // All threads should have gotten the same instance
+        Emitter first = emitters.get(0);
+        for (Emitter emitter : emitters) {
+            assertSame(first, emitter, "All threads should get the same 
emitter instance");
+        }
+    }
+
+    @Test
+    public void testUnknownEmitterId(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        EmitterNotFoundException exception = 
assertThrows(EmitterNotFoundException.class, () -> {
+            emitterManager.getEmitter("non-existent-emitter");
+        });
+
+        assertTrue(exception.getMessage().contains("non-existent-emitter"));
+        assertTrue(exception.getMessage().contains("Available:"));
+    }
+
+    @Test
+    public void testUnknownEmitterType(@TempDir Path tmpDir) throws Exception {
+        String configJson = """
+                {
+                  "emitters": {
+                    "non-existent-emitter-type": {
+                      "emitter1": {
+                        "someProp": "value"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """;
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Should fail during load (early validation)
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            EmitterManager.load(pluginManager, tikaJsonConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Unknown emitter type"));
+        
assertTrue(exception.getMessage().contains("non-existent-emitter-type"));
+    }
+
+    @Test
+    public void testDuplicateEmitterId(@TempDir Path tmpDir) throws Exception {
+        String configJson = String.format(Locale.ROOT, """
+                {
+                  "emitters": {
+                    "file-system-emitter": {
+                      "fse1": {
+                        "basePath": "%s",
+                        "onExists": "REPLACE"
+                      },
+                      "fse1": {
+                        "basePath": "%s",
+                        "onExists": "REPLACE"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output1")),
+                     PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        // PolymorphicObjectMapperFactory has FAIL_ON_READING_DUP_TREE_KEY 
enabled
+        // so duplicate keys are caught during JSON parsing
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            TikaJsonConfig.load(configPath);
+        });
+
+        assertTrue(exception.getMessage().contains("Failed to parse JSON") &&
+                exception.getCause() != null &&
+                exception.getCause().getMessage().contains("Duplicate field"));
+    }
+
+    @Test
+    public void testGetSingleEmitter(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        // When only one emitter exists, no-arg getEmitter() should work
+        Emitter emitter = emitterManager.getEmitter();
+        assertNotNull(emitter);
+        assertEquals("fse", emitter.getExtensionConfig().id());
+    }
+
+    @Test
+    public void testGetSingleEmitterWithMultipleConfigured(@TempDir Path 
tmpDir) throws Exception {
+        String configJson = String.format(Locale.ROOT, """
+                {
+                  "emitters": {
+                    "file-system-emitter": {
+                      "fse1": {
+                        "basePath": "%s",
+                        "onExists": "REPLACE"
+                      },
+                      "fse2": {
+                        "basePath": "%s",
+                        "onExists": "REPLACE"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output1")),
+                     PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        // When multiple emitters exist, no-arg getEmitter() should fail
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> {
+            emitterManager.getEmitter();
+        });
+
+        assertTrue(exception.getMessage().contains("exactly 1"));
+    }
+
+    @Test
+    public void testSaveEmitter(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Load with runtime modifications enabled
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Initially only fse exists
+        assertEquals(1, emitterManager.getSupported().size());
+
+        // Dynamically add a new emitter configuration
+        String newConfigJson = String.format(Locale.ROOT, """
+                {
+                  "basePath": "%s",
+                  "onExists": "REPLACE"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+        ExtensionConfig newConfig = new ExtensionConfig("fse2", 
"file-system-emitter", newConfigJson);
+
+        emitterManager.saveEmitter(newConfig);
+
+        // Now both should be available
+        assertEquals(2, emitterManager.getSupported().size());
+        assertTrue(emitterManager.getSupported().contains("fse"));
+        assertTrue(emitterManager.getSupported().contains("fse2"));
+
+        // Emitter should be lazily instantiated when requested
+        Emitter emitter2 = emitterManager.getEmitter("fse2");
+        assertNotNull(emitter2);
+        assertEquals("fse2", emitter2.getExtensionConfig().id());
+    }
+
+    @Test
+    public void testSaveEmitterDuplicate(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Try to add an emitter with the same ID as existing one
+        String newConfigJson = String.format(Locale.ROOT, """
+                {
+                  "basePath": "%s",
+                  "onExists": "REPLACE"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+        ExtensionConfig duplicateConfig = new ExtensionConfig("fse", 
"file-system-emitter", newConfigJson);
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            emitterManager.saveEmitter(duplicateConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("already exists"));
+        assertTrue(exception.getMessage().contains("fse"));
+    }
+
+    @Test
+    public void testSaveEmitterUnknownType(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Try to add an emitter with unknown type
+        ExtensionConfig unknownTypeConfig = new ExtensionConfig("emitter2", 
"unknown-emitter-type", "{}");
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            emitterManager.saveEmitter(unknownTypeConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Unknown emitter type"));
+        assertTrue(exception.getMessage().contains("unknown-emitter-type"));
+    }
+
+    @Test
+    public void testSaveEmitterNull(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> {
+            emitterManager.saveEmitter(null);
+        });
+
+        assertTrue(exception.getMessage().contains("cannot be null"));
+    }
+
+    @Test
+    public void testSaveEmitterLazyInstantiation(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Add multiple emitters
+        for (int i = 2; i <= 5; i++) {
+            String configJson = String.format(Locale.ROOT, """
+                    {
+                      "basePath": "%s",
+                      "onExists": "REPLACE"
+                    }
+                    """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output" 
+ i)));
+            ExtensionConfig config2 = new ExtensionConfig("fse" + i, 
"file-system-emitter", configJson);
+            emitterManager.saveEmitter(config2);
+        }
+
+        // All 5 should be in supported list
+        assertEquals(5, emitterManager.getSupported().size());
+
+        // Request only fse3 - only it should be instantiated
+        Emitter emitter3 = emitterManager.getEmitter("fse3");
+        assertNotNull(emitter3);
+        assertEquals("fse3", emitter3.getExtensionConfig().id());
+
+        // Others are still available but not instantiated yet
+        assertTrue(emitterManager.getSupported().contains("fse2"));
+        assertTrue(emitterManager.getSupported().contains("fse4"));
+        assertTrue(emitterManager.getSupported().contains("fse5"));
+    }
+
+    @Test
+    public void testSaveEmitterNotAllowed(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Load with default (runtime modifications disabled)
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
+
+        // Try to add an emitter - should fail
+        String newConfigJson = String.format(Locale.ROOT, """
+                {
+                  "basePath": "%s",
+                  "onExists": "REPLACE"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+        ExtensionConfig newConfig = new ExtensionConfig("fse2", 
"file-system-emitter", newConfigJson);
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            emitterManager.saveEmitter(newConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Runtime modifications are 
not allowed"));
+        
assertTrue(exception.getMessage().contains("allowRuntimeModifications=true"));
+    }
+
+    @Test
+    public void testSaveEmitterNotAllowedExplicit(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Load with explicit false
+        EmitterManager emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig, false);
+
+        // Try to add an emitter - should fail
+        String newConfigJson = String.format(Locale.ROOT, """
+                {
+                  "basePath": "%s",
+                  "onExists": "REPLACE"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+        ExtensionConfig newConfig = new ExtensionConfig("fse2", 
"file-system-emitter", newConfigJson);
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            emitterManager.saveEmitter(newConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Runtime modifications are 
not allowed"));
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
new file mode 100644
index 000000000..7e206257c
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.fetcher;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.fetcher.Fetcher;
+import org.apache.tika.pipes.api.fetcher.FetcherNotFoundException;
+import org.apache.tika.pipes.core.PluginsTestHelper;
+import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaPluginManager;
+
+public class FetcherManagerTest {
+
+    @Test
+    public void testBasicLoad(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        assertNotNull(fetcherManager);
+        assertEquals(1, fetcherManager.getSupported().size());
+        assertTrue(fetcherManager.getSupported().contains("fsf"));
+    }
+
+    @Test
+    public void testEmptyConfig(@TempDir Path tmpDir) throws Exception {
+        // Create config with no fetchers
+        String configJson = """
+                {
+                  "plugin-roots": "target/plugins"
+                }
+                """;
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        assertNotNull(fetcherManager);
+        assertEquals(0, fetcherManager.getSupported().size());
+
+        // Try to get a fetcher when none are configured
+        FetcherNotFoundException exception = 
assertThrows(FetcherNotFoundException.class, () -> {
+            fetcherManager.getFetcher("any-id");
+        });
+
+        assertTrue(exception.getMessage().contains("any-id"));
+        assertTrue(exception.getMessage().contains("Available: []"));
+    }
+
+    @Test
+    public void testLazyInstantiation(@TempDir Path tmpDir) throws Exception {
+        // Create config with multiple fetchers
+        String configJson = String.format(Locale.ROOT, """
+                {
+                  "fetchers": {
+                    "file-system-fetcher": {
+                      "fsf1": {
+                        "basePath": "%s"
+                      },
+                      "fsf2": {
+                        "basePath": "%s"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path1")),
+                     PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        // After load, both fetchers should be in supported list but not 
instantiated yet
+        assertEquals(2, fetcherManager.getSupported().size());
+
+        // Request only fsf1 - only it should be instantiated
+        Fetcher fetcher1 = fetcherManager.getFetcher("fsf1");
+        assertNotNull(fetcher1);
+        assertEquals("fsf1", fetcher1.getExtensionConfig().id());
+
+        // fsf2 has not been requested yet - verify it exists in config
+        assertTrue(fetcherManager.getSupported().contains("fsf2"));
+
+        // Now request fsf2
+        Fetcher fetcher2 = fetcherManager.getFetcher("fsf2");
+        assertNotNull(fetcher2);
+        assertEquals("fsf2", fetcher2.getExtensionConfig().id());
+    }
+
+    @Test
+    public void testCaching(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        // Get the same fetcher multiple times
+        Fetcher fetcher1 = fetcherManager.getFetcher("fsf");
+        Fetcher fetcher2 = fetcherManager.getFetcher("fsf");
+        Fetcher fetcher3 = fetcherManager.getFetcher("fsf");
+
+        // Should be the exact same instance (reference equality)
+        assertSame(fetcher1, fetcher2);
+        assertSame(fetcher2, fetcher3);
+    }
+
+    @Test
+    public void testThreadSafety(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        int threadCount = 10;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+        List<Future<Fetcher>> futures = new ArrayList<>();
+
+        // Start multiple threads that all request the same fetcher 
simultaneously
+        for (int i = 0; i < threadCount; i++) {
+            futures.add(executor.submit(() -> {
+                try {
+                    // Wait for all threads to be ready
+                    startLatch.await();
+
+                    // All threads try to get the fetcher at once
+                    return fetcherManager.getFetcher("fsf");
+                } finally {
+                    doneLatch.countDown();
+                }
+            }));
+        }
+
+        // Start all threads at once
+        startLatch.countDown();
+
+        // Wait for all threads to complete
+        assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
+
+        // Collect all fetchers
+        List<Fetcher> fetchers = new ArrayList<>();
+        for (Future<Fetcher> future : futures) {
+            fetchers.add(future.get());
+        }
+
+        executor.shutdown();
+
+        // All threads should have gotten the same instance
+        Fetcher first = fetchers.get(0);
+        for (Fetcher fetcher : fetchers) {
+            assertSame(first, fetcher, "All threads should get the same 
fetcher instance");
+        }
+    }
+
+    @Test
+    public void testUnknownFetcherId(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        FetcherNotFoundException exception = 
assertThrows(FetcherNotFoundException.class, () -> {
+            fetcherManager.getFetcher("non-existent-fetcher");
+        });
+
+        assertTrue(exception.getMessage().contains("non-existent-fetcher"));
+        assertTrue(exception.getMessage().contains("Available:"));
+    }
+
+    @Test
+    public void testUnknownFetcherType(@TempDir Path tmpDir) throws Exception {
+        String configJson = """
+                {
+                  "fetchers": {
+                    "non-existent-fetcher-type": {
+                      "fetcher1": {
+                        "someProp": "value"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """;
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Should fail during load (early validation)
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            FetcherManager.load(pluginManager, tikaJsonConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Unknown fetcher type"));
+        
assertTrue(exception.getMessage().contains("non-existent-fetcher-type"));
+    }
+
+    @Test
+    public void testDuplicateFetcherId(@TempDir Path tmpDir) throws Exception {
+        String configJson = String.format(Locale.ROOT, """
+                {
+                  "fetchers": {
+                    "file-system-fetcher": {
+                      "fsf1": {
+                        "basePath": "%s"
+                      },
+                      "fsf1": {
+                        "basePath": "%s"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path1")),
+                     PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        // PolymorphicObjectMapperFactory has FAIL_ON_READING_DUP_TREE_KEY 
enabled
+        // so duplicate keys are caught during JSON parsing
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            TikaJsonConfig.load(configPath);
+        });
+
+        assertTrue(exception.getMessage().contains("Failed to parse JSON") &&
+                exception.getCause() != null &&
+                exception.getCause().getMessage().contains("Duplicate field"));
+    }
+
+    @Test
+    public void testGetSingleFetcher(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        // When only one fetcher exists, no-arg getFetcher() should work
+        Fetcher fetcher = fetcherManager.getFetcher();
+        assertNotNull(fetcher);
+        assertEquals("fsf", fetcher.getExtensionConfig().id());
+    }
+
+    @Test
+    public void testGetSingleFetcherWithMultipleConfigured(@TempDir Path 
tmpDir) throws Exception {
+        String configJson = String.format(Locale.ROOT, """
+                {
+                  "fetchers": {
+                    "file-system-fetcher": {
+                      "fsf1": {
+                        "basePath": "%s"
+                      },
+                      "fsf2": {
+                        "basePath": "%s"
+                      }
+                    }
+                  },
+                  "plugin-roots": "target/plugins"
+                }
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path1")),
+                     PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+
+        Path configPath = tmpDir.resolve("config.json");
+        Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        // When multiple fetchers exist, no-arg getFetcher() should fail
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> {
+            fetcherManager.getFetcher();
+        });
+
+        assertTrue(exception.getMessage().contains("requires exactly 1"));
+    }
+
+    @Test
+    public void testSaveFetcher(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Load with runtime modifications enabled
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Initially only fsf exists
+        assertEquals(1, fetcherManager.getSupported().size());
+
+        // Dynamically add a new fetcher configuration
+        String newConfigJson = String.format(Locale.ROOT, """
+                {"basePath": "%s"}
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+        ExtensionConfig newConfig = new ExtensionConfig("fsf2", 
"file-system-fetcher", newConfigJson);
+
+        fetcherManager.saveFetcher(newConfig);
+
+        // Now both should be available
+        assertEquals(2, fetcherManager.getSupported().size());
+        assertTrue(fetcherManager.getSupported().contains("fsf"));
+        assertTrue(fetcherManager.getSupported().contains("fsf2"));
+
+        // Fetcher should be lazily instantiated when requested
+        Fetcher fetcher2 = fetcherManager.getFetcher("fsf2");
+        assertNotNull(fetcher2);
+        assertEquals("fsf2", fetcher2.getExtensionConfig().id());
+    }
+
+    @Test
+    public void testSaveFetcherDuplicate(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Try to add a fetcher with the same ID as existing one
+        String newConfigJson = String.format(Locale.ROOT, """
+                {"basePath": "%s"}
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+        ExtensionConfig duplicateConfig = new ExtensionConfig("fsf", 
"file-system-fetcher", newConfigJson);
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            fetcherManager.saveFetcher(duplicateConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("already exists"));
+        assertTrue(exception.getMessage().contains("fsf"));
+    }
+
+    @Test
+    public void testSaveFetcherUnknownType(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Try to add a fetcher with unknown type
+        ExtensionConfig unknownTypeConfig = new ExtensionConfig("fetcher2", 
"unknown-fetcher-type", "{}");
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            fetcherManager.saveFetcher(unknownTypeConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Unknown fetcher type"));
+        assertTrue(exception.getMessage().contains("unknown-fetcher-type"));
+    }
+
+    @Test
+    public void testSaveFetcherNull(@TempDir Path tmpDir) throws Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> {
+            fetcherManager.saveFetcher(null);
+        });
+
+        assertTrue(exception.getMessage().contains("cannot be null"));
+    }
+
+    @Test
+    public void testSaveFetcherLazyInstantiation(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig, true);
+
+        // Add multiple fetchers
+        for (int i = 2; i <= 5; i++) {
+            String configJson = String.format(Locale.ROOT, """
+                    {"basePath": "%s"}
+                    """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path" + 
i)));
+            ExtensionConfig config2 = new ExtensionConfig("fsf" + i, 
"file-system-fetcher", configJson);
+            fetcherManager.saveFetcher(config2);
+        }
+
+        // All 5 should be in supported list
+        assertEquals(5, fetcherManager.getSupported().size());
+
+        // Request only fsf3 - only it should be instantiated
+        Fetcher fetcher3 = fetcherManager.getFetcher("fsf3");
+        assertNotNull(fetcher3);
+        assertEquals("fsf3", fetcher3.getExtensionConfig().id());
+
+        // Others are still available but not instantiated yet
+        assertTrue(fetcherManager.getSupported().contains("fsf2"));
+        assertTrue(fetcherManager.getSupported().contains("fsf4"));
+        assertTrue(fetcherManager.getSupported().contains("fsf5"));
+    }
+
+    @Test
+    public void testSaveFetcherNotAllowed(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Load with default (runtime modifications disabled)
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig);
+
+        // Try to add a fetcher - should fail
+        String newConfigJson = String.format(Locale.ROOT, """
+                {"basePath": "%s"}
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+        ExtensionConfig newConfig = new ExtensionConfig("fsf2", 
"file-system-fetcher", newConfigJson);
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            fetcherManager.saveFetcher(newConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Runtime modifications are 
not allowed"));
+        
assertTrue(exception.getMessage().contains("allowRuntimeModifications=true"));
+    }
+
+    @Test
+    public void testSaveFetcherNotAllowedExplicit(@TempDir Path tmpDir) throws 
Exception {
+        Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+        TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
+
+        // Load with explicit false
+        FetcherManager fetcherManager = FetcherManager.load(pluginManager, 
tikaJsonConfig, false);
+
+        // Try to add a fetcher - should fail
+        String newConfigJson = String.format(Locale.ROOT, """
+                {"basePath": "%s"}
+                """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+        ExtensionConfig newConfig = new ExtensionConfig("fsf2", 
"file-system-fetcher", newConfigJson);
+
+        TikaConfigException exception = 
assertThrows(TikaConfigException.class, () -> {
+            fetcherManager.saveFetcher(newConfig);
+        });
+
+        assertTrue(exception.getMessage().contains("Runtime modifications are 
not allowed"));
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index 65f229891..3e785c640 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -73,9 +73,9 @@ public class FileSystemEmitter extends AbstractStreamEmitter {
         checkConfig(fileSystemEmitterConfig);
     }
 
-    private void checkConfig(FileSystemEmitterConfig fileSystemEmitterConfig) {
+    private void checkConfig(FileSystemEmitterConfig fileSystemEmitterConfig) 
throws TikaConfigException {
         if (fileSystemEmitterConfig.onExists() == null) {
-            throw new IllegalArgumentException("Must configure 'onExists' as 
'skip', 'exception' or 'replace'");
+            throw new TikaConfigException("Must configure 'onExists' as 
'skip', 'exception' or 'replace'");
         }
     }
 
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
index 986f4119e..98d50cae9 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
@@ -151,7 +151,7 @@ public class TestJDBCPipesReporter {
         Map<PipesResult.RESULT_STATUS, Long> expected = runBatch(reporter, 
numThreads, numIterations);
         reporter.close();
         Map<PipesResult.RESULT_STATUS, Long> total = 
countReported(connectionString);
-        assertEquals(16, total.size());
+        assertEquals(PipesResult.RESULT_STATUS.values().length - 2, 
total.size());
         long sum = 0;
         for (Map.Entry<PipesResult.RESULT_STATUS, Long> e : 
expected.entrySet()) {
             if (e.getKey() != PARSE_SUCCESS && e.getKey() != 
PARSE_SUCCESS_WITH_EXCEPTION) {


Reply via email to