This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 612fb428b TIKA-4558 -- add lazy loading to FetcherManager (#2426)
612fb428b is described below
commit 612fb428b1c00428870a776377cf260cecb19fbe
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 9 09:16:05 2025 -0500
TIKA-4558 -- add lazy loading to FetcherManager (#2426)
---
.../org/apache/tika/pipes/api/PipesResult.java | 7 +-
.../api/emitter/EmitterNotFoundException.java | 33 ++
.../api/fetcher/FetcherNotFoundException.java | 33 ++
.../tika/pipes/core/AbstractComponentManager.java | 294 ++++++++++++
.../apache/tika/pipes/core/async/AsyncEmitter.java | 9 +-
.../tika/pipes/core/emitter/EmitterManager.java | 137 ++++--
.../EmittingEmbeddedDocumentBytesHandler.java | 3 +-
.../tika/pipes/core/fetcher/FetcherManager.java | 139 ++++--
.../apache/tika/pipes/core/server/EmitHandler.java | 5 +-
.../apache/tika/pipes/core/server/PipesWorker.java | 12 +-
.../apache/tika/pipes/core/PipesClientTest.java | 5 +-
.../apache/tika/pipes/core/PluginsTestHelper.java | 11 +
.../pipes/core/emitter/EmitterManagerTest.java | 516 +++++++++++++++++++++
.../pipes/core/fetcher/FetcherManagerTest.java | 495 ++++++++++++++++++++
.../tika/pipes/emitter/fs/FileSystemEmitter.java | 4 +-
.../pipes/reporter/jdbc/TestJDBCPipesReporter.java | 2 +-
16 files changed, 1608 insertions(+), 97 deletions(-)
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
index 86d84ad5b..9418bac0b 100644
---
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
@@ -77,10 +77,13 @@ public record PipesResult(RESULT_STATUS status, EmitData
emitData, String messag
// Emit failure
EMIT_EXCEPTION(CATEGORY.APPLICATION_ERROR),
+ // Emitter failures
+ EMITTER_INITIALIZATION_EXCEPTION(CATEGORY.APPLICATION_ERROR),
+ EMITTER_NOT_FOUND(CATEGORY.APPLICATION_ERROR),
+
// Other errors
INTERRUPTED_EXCEPTION(CATEGORY.APPLICATION_ERROR),
- FETCHER_NOT_FOUND(CATEGORY.APPLICATION_ERROR),
- EMITTER_NOT_FOUND(CATEGORY.APPLICATION_ERROR);
+ FETCHER_NOT_FOUND(CATEGORY.APPLICATION_ERROR);
private final CATEGORY category;
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterNotFoundException.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterNotFoundException.java
new file mode 100644
index 000000000..5f063d249
--- /dev/null
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterNotFoundException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.emitter;
+
+import org.apache.tika.exception.TikaException;
+
+/**
+ * Exception thrown when a requested emitter configuration does not exist.
+ */
+public class EmitterNotFoundException extends TikaException {
+
+ public EmitterNotFoundException(String msg) {
+ super(msg);
+ }
+
+ public EmitterNotFoundException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherNotFoundException.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherNotFoundException.java
new file mode 100644
index 000000000..d05d4335c
--- /dev/null
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherNotFoundException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.fetcher;
+
+import org.apache.tika.exception.TikaException;
+
+/**
+ * Exception thrown when a requested fetcher configuration does not exist.
+ */
+public class FetcherNotFoundException extends TikaException {
+
+ public FetcherNotFoundException(String msg) {
+ super(msg);
+ }
+
+ public FetcherNotFoundException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
new file mode 100644
index 000000000..7dab511e0
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.pf4j.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.loader.PolymorphicObjectMapperFactory;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaExtension;
+import org.apache.tika.plugins.TikaExtensionFactory;
+
+/**
+ * Abstract base class for managing Tika components (Fetchers, Emitters, etc.).
+ * Provides lazy instantiation, early validation, and optional runtime
modifications.
+ *
+ * @param <T> the component type (e.g., Fetcher, Emitter)
+ * @param <F> the factory type for creating components
+ */
+public abstract class AbstractComponentManager<T extends TikaExtension,
+ F extends
TikaExtensionFactory<T>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractComponentManager.class);
+
+ protected final PluginManager pluginManager;
+ private final Map<String, ExtensionConfig> componentConfigs = new
ConcurrentHashMap<>();
+ private final Map<String, T> componentCache = new ConcurrentHashMap<>();
+ private final boolean allowRuntimeModifications;
+
+ protected AbstractComponentManager(PluginManager pluginManager,
+ Map<String, ExtensionConfig>
componentConfigs,
+ boolean allowRuntimeModifications) {
+ this.pluginManager = pluginManager;
+ this.componentConfigs.putAll(componentConfigs);
+ this.allowRuntimeModifications = allowRuntimeModifications;
+ }
+
+ /**
+ * Returns the JSON configuration key for this component type (e.g.,
"fetchers", "emitters").
+ */
+ protected abstract String getConfigKey();
+
+ /**
+ * Returns the factory class for this component type.
+ */
+ protected abstract Class<F> getFactoryClass();
+
+ /**
+ * Returns the component name for error messages (e.g., "fetcher",
"emitter").
+ */
+ protected abstract String getComponentName();
+
+ /**
+ * Creates a not-found exception for this component type.
+ */
+ protected abstract TikaException createNotFoundException(String message);
+
+ /**
+ * Validates the configuration and collects component configs without
instantiating.
+ */
+ protected Map<String, ExtensionConfig> validateAndCollectConfigs(
+ PluginManager pluginManager, JsonNode configNode) throws
TikaConfigException, IOException {
+
+ Map<String, F> factories = getFactories(pluginManager);
+ Map<String, ExtensionConfig> configs = new HashMap<>();
+
+ if (configNode != null && !configNode.isNull()) {
+ // Outer loop: iterate over type names
+ Iterator<Map.Entry<String, JsonNode>> typeFields =
configNode.fields();
+ while (typeFields.hasNext()) {
+ Map.Entry<String, JsonNode> typeEntry = typeFields.next();
+ String typeName = typeEntry.getKey();
+ JsonNode instancesNode = typeEntry.getValue();
+
+ // Validate that factory exists
+ F factory = factories.get(typeName);
+ if (factory == null) {
+ throw new TikaConfigException(
+ "Unknown " + getComponentName() + " type: " +
typeName +
+ ". Available: " + factories.keySet());
+ }
+
+ // Inner loop: iterate over instances of this type
+ Iterator<Map.Entry<String, JsonNode>> instanceFields =
instancesNode.fields();
+ while (instanceFields.hasNext()) {
+ Map.Entry<String, JsonNode> instanceEntry =
instanceFields.next();
+ String instanceId = instanceEntry.getKey();
+ JsonNode config = instanceEntry.getValue();
+
+ if (configs.containsKey(instanceId)) {
+ throw new TikaConfigException("Duplicate " +
getComponentName() +
+ " id: " + instanceId);
+ }
+
+ configs.put(instanceId, new ExtensionConfig(instanceId,
typeName,
+ toJsonString(config)));
+ }
+ }
+ }
+
+ return configs;
+ }
+
+ protected Map<String, F> getFactories(PluginManager pluginManager) throws
TikaConfigException {
+ if (pluginManager.getStartedPlugins().isEmpty()) {
+ pluginManager.loadPlugins();
+ pluginManager.startPlugins();
+ }
+
+ Map<String, F> factories = new HashMap<>();
+ for (F factory : pluginManager.getExtensions(getFactoryClass())) {
+ String name = factory.getName();
+ ClassLoader cl = factory.getClass().getClassLoader();
+ boolean isFromPlugin = cl instanceof org.pf4j.PluginClassLoader;
+
+ F existing = factories.get(name);
+ if (existing != null) {
+ boolean existingIsFromPlugin =
existing.getClass().getClassLoader()
+ instanceof org.pf4j.PluginClassLoader;
+ if (isFromPlugin && !existingIsFromPlugin) {
+ // Replace classpath version with plugin version
+ factories.put(name, factory);
+ }
+ // Otherwise skip duplicate (keep existing)
+ continue;
+ }
+ factories.put(name, factory);
+ }
+ return factories;
+ }
+
+ private static String toJsonString(final JsonNode node) throws
TikaConfigException {
+ try {
+ return
PolymorphicObjectMapperFactory.getMapper().writeValueAsString(node);
+ } catch (JsonProcessingException e) {
+ throw new TikaConfigException("Failed to serialize config to JSON
string", e);
+ }
+ }
+
+ /**
+ * Gets a component by ID, lazily instantiating it if needed.
+ */
+ public T getComponent(String id) throws IOException, TikaException {
+ // Check cache first (fast path, no synchronization)
+ T component = componentCache.get(id);
+ if (component != null) {
+ return component;
+ }
+
+ // Check if config exists
+ ExtensionConfig config = componentConfigs.get(id);
+ if (config == null) {
+ throw createNotFoundException(
+ "Can't find " + getComponentName() + " for id=" + id +
+ ". Available: " + componentConfigs.keySet());
+ }
+
+ // Synchronized block to ensure only one thread builds the component
+ synchronized (this) {
+ // Double-check in case another thread built it while we were
waiting
+ component = componentCache.get(id);
+ if (component != null) {
+ return component;
+ }
+
+ // Build the component
+ try {
+ component = buildComponent(config);
+ componentCache.put(id, component);
+ LOG.debug("Lazily instantiated {}: {}", getComponentName(),
id);
+ return component;
+ } catch (TikaConfigException e) {
+ throw new IOException("Failed to build " + getComponentName()
+ ": " + id, e);
+ }
+ }
+ }
+
+ /**
+ * Builds a component instance from its configuration.
+ */
+ private T buildComponent(ExtensionConfig config) throws
TikaConfigException, IOException {
+ Map<String, F> factories = getFactories(pluginManager);
+ F factory = factories.get(config.name());
+
+ if (factory == null) {
+ // This shouldn't happen since we validated in load(), but check
anyway
+ throw new TikaConfigException(
+ "Unknown " + getComponentName() + " type: " +
config.name() +
+ ". Available: " + factories.keySet());
+ }
+
+ return factory.buildExtension(config);
+ }
+
+ /**
+ * Dynamically adds a component configuration at runtime.
+ * The component will not be instantiated until it is first requested via
{@link #getComponent(String)}.
+ * <p>
+ * This method is only available if the manager was loaded with
allowRuntimeModifications=true.
+ * <p>
+ * Only authorized/authenticated users should be allowed to modify
components. BE CAREFUL.
+ *
+ * @param config the extension configuration for the component
+ * @throws TikaConfigException if the component type is unknown, if a
component with the same ID already exists,
+ * or if runtime modifications are not allowed
+ * @throws IOException if there is an error accessing the plugin manager
+ */
+ public synchronized void saveComponent(ExtensionConfig config) throws
TikaConfigException, IOException {
+ if (!allowRuntimeModifications) {
+ throw new TikaConfigException(
+ "Runtime modifications are not allowed. " +
getClass().getSimpleName() +
+ " must be loaded with allowRuntimeModifications=true to
use save" +
+ getComponentName().substring(0,
1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
+ }
+
+ if (config == null) {
+ throw new IllegalArgumentException("ExtensionConfig cannot be
null");
+ }
+
+ String componentId = config.id();
+ String typeName = config.name();
+
+ // Check for duplicate ID
+ if (componentConfigs.containsKey(componentId)) {
+ throw new TikaConfigException(getComponentName().substring(0,
1).toUpperCase(Locale.ROOT) +
+ getComponentName().substring(1) + " with id '" +
componentId + "' already exists");
+ }
+
+ // Validate that factory exists for this type
+ Map<String, F> factories = getFactories(pluginManager);
+ if (!factories.containsKey(typeName)) {
+ throw new TikaConfigException(
+ "Unknown " + getComponentName() + " type: " + typeName +
+ ". Available: " + factories.keySet());
+ }
+
+ // Store config without instantiating
+ componentConfigs.put(componentId, config);
+ LOG.debug("Saved {} config: id={}, type={}", getComponentName(),
componentId, typeName);
+ }
+
+ /**
+ * Returns the set of supported component IDs.
+ */
+ public Set<String> getSupported() {
+ return componentConfigs.keySet();
+ }
+
+ /**
+ * Convenience method that returns a component if only one component
+ * is configured. If 0 or > 1 components are configured, this throws an
IllegalArgumentException.
+ *
+ * @return the single configured component
+ */
+ public T getComponent() throws IOException, TikaException {
+ if (componentConfigs.size() != 1) {
+ throw new IllegalArgumentException(
+ "No-arg get" + getComponentName().substring(0,
1).toUpperCase(Locale.ROOT) +
+ getComponentName().substring(1) + "() requires exactly 1
configured " +
+ getComponentName() + ". Found: " + componentConfigs.size()
+
+ " (" + componentConfigs.keySet() + ")");
+ }
+ // Get the single component id and use getComponent(id) for lazy
loading
+ String componentId = componentConfigs.keySet().iterator().next();
+ return getComponent(componentId);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
index ea084aa65..136bda109 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.api.emitter.EmitData;
import org.apache.tika.pipes.api.emitter.Emitter;
import org.apache.tika.pipes.core.PipesConfig;
@@ -119,7 +120,13 @@ public class AsyncEmitter implements Callable<Integer> {
int emitted = 0;
LOG.debug("about to emit {} files, {} estimated bytes", size,
estimatedSize);
for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
- Emitter emitter = emitterManager.getEmitter(e.getKey());
+ Emitter emitter = null;
+ try {
+ emitter = emitterManager.getEmitter(e.getKey());
+ } catch (IOException | TikaException ex) {
+ LOG.warn("emitter id={} failed on instantiation",
e.getKey(), ex);
+ return;
+ }
tryToEmit(emitter, e.getValue());
emitted += e.getValue().size();
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
index ee5ac55a4..101de53d6 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
@@ -18,79 +18,134 @@ package org.apache.tika.pipes.core.emitter;
import java.io.IOException;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.databind.JsonNode;
import org.pf4j.PluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.api.emitter.Emitter;
import org.apache.tika.pipes.api.emitter.EmitterFactory;
-import org.apache.tika.plugins.PluginComponentLoader;
+import org.apache.tika.pipes.api.emitter.EmitterNotFoundException;
+import org.apache.tika.pipes.core.AbstractComponentManager;
+import org.apache.tika.plugins.ExtensionConfig;
/**
* Utility class that will apply the appropriate emitter
* to the emitterString based on the prefix.
* <p>
* This does not allow multiple emitters supporting the same prefix.
+ * Emitters are instantiated lazily on first use.
*/
-public class EmitterManager {
- public static final String CONFIG_KEY = "emitters";
+public class EmitterManager extends AbstractComponentManager<Emitter,
EmitterFactory> {
- private static final Logger LOG =
LoggerFactory.getLogger(EmitterManager.class);
+ private static final String CONFIG_KEY = "emitters";
- private final Map<String, Emitter> emitterMap = new ConcurrentHashMap<>();
+ /**
+ * Loads an EmitterManager without allowing runtime modifications.
+ * Use {@link #load(PluginManager, TikaJsonConfig, boolean)} to enable
runtime emitter additions.
+ *
+ * @param pluginManager the plugin manager
+ * @param tikaJsonConfig the configuration
+ * @return an EmitterManager that does not allow runtime modifications
+ */
+ public static EmitterManager load(PluginManager pluginManager,
TikaJsonConfig tikaJsonConfig)
+ throws IOException, TikaConfigException {
+ return load(pluginManager, tikaJsonConfig, false);
+ }
- public static EmitterManager load(PluginManager pluginManager,
TikaJsonConfig tikaJsonConfig) throws IOException, TikaConfigException {
- JsonNode fetchersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
- Map<String, Emitter> fetchers =
- PluginComponentLoader.loadInstances(pluginManager,
EmitterFactory.class, fetchersNode);
- return new EmitterManager(fetchers);
+ /**
+ * Loads an EmitterManager with optional support for runtime modifications.
+ *
+ * @param pluginManager the plugin manager
+ * @param tikaJsonConfig the configuration
+ * @param allowRuntimeModifications if true, allows calling {@link
#saveEmitter(ExtensionConfig)}
+ * to add emitters at runtime
+ * @return an EmitterManager
+ */
+ public static EmitterManager load(PluginManager pluginManager,
TikaJsonConfig tikaJsonConfig,
+ boolean allowRuntimeModifications)
+ throws IOException, TikaConfigException {
+ EmitterManager manager = new EmitterManager(pluginManager,
allowRuntimeModifications);
+ JsonNode emittersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
+
+ // Validate configuration and collect emitter configs without
instantiating
+ Map<String, ExtensionConfig> configs =
manager.validateAndCollectConfigs(pluginManager, emittersNode);
+
+ return new EmitterManager(pluginManager, configs,
allowRuntimeModifications);
}
- private EmitterManager() {
+ private EmitterManager(PluginManager pluginManager, boolean
allowRuntimeModifications) {
+ super(pluginManager, Map.of(), allowRuntimeModifications);
+ }
+ private EmitterManager(PluginManager pluginManager, Map<String,
ExtensionConfig> emitterConfigs,
+ boolean allowRuntimeModifications) {
+ super(pluginManager, emitterConfigs, allowRuntimeModifications);
}
- private EmitterManager(Map<String, Emitter> emitters) {
- emitterMap.putAll(emitters);
+ @Override
+ protected String getConfigKey() {
+ return CONFIG_KEY;
}
- public Set<String> getSupported() {
- return emitterMap.keySet();
+ @Override
+ protected Class<EmitterFactory> getFactoryClass() {
+ return EmitterFactory.class;
}
+ @Override
+ protected String getComponentName() {
+ return "emitter";
+ }
- public Emitter getEmitter(String emitterName) {
- Emitter emitter = emitterMap.get(emitterName);
- if (emitter == null) {
- throw new IllegalArgumentException("Can't find emitter for prefix:
" + emitterName);
- }
- return emitter;
+ @Override
+ protected TikaException createNotFoundException(String message) {
+ return new EmitterNotFoundException(message);
+ }
+
+ /**
+ * Gets an emitter by ID, lazily instantiating it if needed.
+ *
+ * @param emitterName the emitter ID
+ * @return the emitter
+ * @throws EmitterNotFoundException if no emitter with the given ID exists
+ * @throws IOException if there's an error building the emitter
+ * @throws TikaException if there's a configuration error
+ */
+ public Emitter getEmitter(String emitterName) throws IOException,
TikaException {
+ return getComponent(emitterName);
}
/**
* Convenience method that returns an emitter if only one emitter
- * is specified in the tika-config file. If 0 or > 1 emitters
- * are specified, this throws an IllegalArgumentException.
- * @return
+ * is configured. If 0 or > 1 emitters are configured, this throws an
IllegalArgumentException.
+ *
+ * @return the single configured emitter
+ * @throws IOException if there's an error building the emitter
+ * @throws TikaException if there's a configuration error
+ */
+ public Emitter getEmitter() throws IOException, TikaException {
+ return getComponent();
+ }
+
+ /**
+ * Dynamically adds an emitter configuration at runtime.
+ * The emitter will not be instantiated until it is first requested via
{@link #getEmitter(String)}.
+ * This allows for dynamic configuration without the overhead of immediate
instantiation.
+ * <p>
+ * This method is only available if the EmitterManager was loaded with
+ * {@link #load(PluginManager, TikaJsonConfig, boolean)} with
allowRuntimeModifications=true.
+ * <p>
+ * Only authorized/authenticated users should be allowed to modify
emitters. BE CAREFUL.
+ *
+ * @param config the extension configuration for the emitter
+ * @throws TikaConfigException if the emitter type is unknown, if an
emitter with the same ID
+ * already exists, or if runtime modifications
are not allowed
+ * @throws IOException if there is an error accessing the plugin manager
*/
- public Emitter getEmitter() {
- if (emitterMap.isEmpty()) {
- throw new IllegalArgumentException("emitters size must == 1 for
the no arg call");
- }
- if (emitterMap.size() > 1) {
- throw new IllegalArgumentException("need to specify 'emitterId' if
> 1 emitters are" +
- " available");
- }
- for (Emitter emitter : emitterMap.values()) {
- return emitter;
- }
- //this should be unreachable?!
- throw new IllegalArgumentException("emitters size must == 0");
+ public void saveEmitter(ExtensionConfig config) throws
TikaConfigException, IOException {
+ saveComponent(config);
}
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
index f1ead0b51..5d74c49ef 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
@@ -39,7 +40,7 @@ public class EmittingEmbeddedDocumentBytesHandler extends
AbstractEmbeddedDocume
private static final ParseContext PARSE_CONTEXT = new ParseContext();
public EmittingEmbeddedDocumentBytesHandler(FetchEmitTuple fetchEmitTuple,
- EmitterManager emitterManager)
throws TikaConfigException {
+ EmitterManager emitterManager)
throws TikaException, IOException {
this.containerEmitKey = fetchEmitTuple.getEmitKey();
this.embeddedDocumentBytesConfig =
fetchEmitTuple.getParseContext().get(EmbeddedDocumentBytesConfig.class);
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
index e30a833fa..af43914ac 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
@@ -18,78 +18,133 @@ package org.apache.tika.pipes.core.fetcher;
import java.io.IOException;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.databind.JsonNode;
import org.pf4j.PluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.api.fetcher.Fetcher;
import org.apache.tika.pipes.api.fetcher.FetcherFactory;
-import org.apache.tika.plugins.PluginComponentLoader;
+import org.apache.tika.pipes.api.fetcher.FetcherNotFoundException;
+import org.apache.tika.pipes.core.AbstractComponentManager;
+import org.apache.tika.plugins.ExtensionConfig;
/**
* Utility class to hold multiple fetchers.
* <p>
- * This forbids multiple fetchers with the same pluginId
+ * This forbids multiple fetchers with the same pluginId.
+ * Fetchers are instantiated lazily on first use.
*/
-public class FetcherManager {
+public class FetcherManager extends AbstractComponentManager<Fetcher,
FetcherFactory> {
- public static final String CONFIG_KEY = "fetchers";
- private static final Logger LOG =
LoggerFactory.getLogger(FetcherManager.class);
+ private static final String CONFIG_KEY = "fetchers";
+ /**
+ * Loads a FetcherManager without allowing runtime modifications.
+ * Use {@link #load(PluginManager, TikaJsonConfig, boolean)} to enable
runtime fetcher additions.
+ *
+ * @param pluginManager the plugin manager
+ * @param tikaJsonConfig the configuration
+ * @return a FetcherManager that does not allow runtime modifications
+ */
+ public static FetcherManager load(PluginManager pluginManager,
TikaJsonConfig tikaJsonConfig)
+ throws TikaConfigException, IOException {
+ return load(pluginManager, tikaJsonConfig, false);
+ }
- public static FetcherManager load(PluginManager pluginManager,
TikaJsonConfig tikaJsonConfig) throws TikaConfigException, IOException {
+ /**
+ * Loads a FetcherManager with optional support for runtime modifications.
+ *
+ * @param pluginManager the plugin manager
+ * @param tikaJsonConfig the configuration
+ * @param allowRuntimeModifications if true, allows calling {@link
#saveFetcher(ExtensionConfig)}
+ * to add fetchers at runtime
+ * @return a FetcherManager
+ */
+ public static FetcherManager load(PluginManager pluginManager,
TikaJsonConfig tikaJsonConfig,
+ boolean allowRuntimeModifications)
+ throws TikaConfigException, IOException {
+ FetcherManager manager = new FetcherManager(pluginManager,
allowRuntimeModifications);
JsonNode fetchersNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
- Map<String, Fetcher> fetchers =
- PluginComponentLoader.loadInstances(pluginManager,
FetcherFactory.class, fetchersNode);
- return new FetcherManager(fetchers);
+
+ // Validate configuration and collect fetcher configs without
instantiating
+ Map<String, ExtensionConfig> configs =
manager.validateAndCollectConfigs(pluginManager, fetchersNode);
+
+ return new FetcherManager(pluginManager, configs,
allowRuntimeModifications);
}
- private final Map<String, Fetcher> fetcherMap = new ConcurrentHashMap<>();
+ private FetcherManager(PluginManager pluginManager, boolean
allowRuntimeModifications) {
+ super(pluginManager, Map.of(), allowRuntimeModifications);
+ }
- private FetcherManager(Map<String, Fetcher> fetcherMap) throws
TikaConfigException {
- this.fetcherMap.putAll(fetcherMap);
+ private FetcherManager(PluginManager pluginManager, Map<String,
ExtensionConfig> fetcherConfigs,
+ boolean allowRuntimeModifications) {
+ super(pluginManager, fetcherConfigs, allowRuntimeModifications);
}
+ @Override
+ protected String getConfigKey() {
+ return CONFIG_KEY;
+ }
- public Fetcher getFetcher(String id) throws IOException, TikaException {
- Fetcher fetcher = fetcherMap.get(id);
- if (fetcher == null) {
- throw new IllegalArgumentException(
- "Can't find fetcher for id=" + id + ". I've loaded: " +
- fetcherMap.keySet());
- }
- return fetcher;
+ @Override
+ protected Class<FetcherFactory> getFactoryClass() {
+ return FetcherFactory.class;
}
- public Set<String> getSupported() {
- return fetcherMap.keySet();
+ @Override
+ protected String getComponentName() {
+ return "fetcher";
+ }
+
+ @Override
+ protected TikaException createNotFoundException(String message) {
+ return new FetcherNotFoundException(message);
+ }
+
+ /**
+ * Gets a fetcher by ID, lazily instantiating it if needed.
+ *
+ * @param id the fetcher ID
+ * @return the fetcher
+ * @throws FetcherNotFoundException if no fetcher with the given ID exists
+ * @throws IOException if there's an error building the fetcher
+ * @throws TikaException if there's a configuration error
+ */
+ public Fetcher getFetcher(String id) throws IOException, TikaException {
+ return getComponent(id);
}
/**
* Convenience method that returns a fetcher if only one fetcher
- * is specified in the tika-config file. If 0 or > 1 fetchers
- * are specified, this throws an IllegalArgumentException.
- * @return
+ * is configured. If 0 or > 1 fetchers are configured, this throws an
IllegalArgumentException.
+ *
+ * @return the single configured fetcher
+ * @throws IOException if there's an error building the fetcher
+ * @throws TikaException if there's a configuration error
+ */
+ public Fetcher getFetcher() throws IOException, TikaException {
+ return getComponent();
+ }
+
+ /**
+ * Dynamically adds a fetcher configuration at runtime.
+ * The fetcher will not be instantiated until it is first requested via
{@link #getFetcher(String)}.
+ * This allows for dynamic configuration without the overhead of immediate
instantiation.
+ * <p>
+ * This method is only available if the FetcherManager was loaded with
+ * {@link #load(PluginManager, TikaJsonConfig, boolean)} with
allowRuntimeModifications=true.
+ * <p>
+ * Only authorized/authenticated users should be allowed to modify
fetchers. BE CAREFUL.
+ *
+ * @param config the extension configuration for the fetcher
+ * @throws TikaConfigException if the fetcher type is unknown, if a
fetcher with the same ID
+ * already exists, or if runtime modifications
are not allowed
+ * @throws IOException if there is an error accessing the plugin manager
*/
- public Fetcher getFetcher() {
- if (fetcherMap.isEmpty()) {
- throw new IllegalArgumentException("fetchers size must == 1 for
the no arg call");
- }
- if (fetcherMap.size() > 1) {
- throw new IllegalArgumentException("need to specify 'fetcherId' if
> 1 fetchers are" +
- " available");
- }
- for (Fetcher fetcher : fetcherMap.values()) {
- return fetcher;
- }
- //this should be unreachable?!
- throw new IllegalArgumentException("fetchers size must == 0");
+ public void saveFetcher(ExtensionConfig config) throws
TikaConfigException, IOException {
+ saveComponent(config);
}
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
index 37c504ff4..687269b2c 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java
@@ -99,10 +99,13 @@ class EmitHandler {
try {
emitter = emitterManager.getEmitter(emitKey.getEmitterId());
- } catch (IllegalArgumentException e) {
+ } catch (org.apache.tika.pipes.api.emitter.EmitterNotFoundException e)
{
String noEmitterMsg = getNoEmitterMsg(taskId);
LOG.warn(noEmitterMsg);
return new
PipesResult(PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND, noEmitterMsg);
+ } catch (IOException | TikaException e) {
+ LOG.warn("Couldn't initialize emitter for task id '" + taskId +
"'", e);
+ return new
PipesResult(PipesResult.RESULT_STATUS.EMITTER_INITIALIZATION_EXCEPTION,
ExceptionUtils.getStackTrace(e));
}
try {
if (isExtractEmbeddedBytes &&
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
index 61fa43c68..daf364990 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java
@@ -131,7 +131,14 @@ class PipesWorker implements Callable<PipesResult> {
return new ParseDataOrPipesResult(null, tisOrResult.pipesResult());
}
- ParseContext parseContext = setupParseContext(fetchEmitTuple);
+ ParseContext parseContext = null;
+ try {
+ parseContext = setupParseContext(fetchEmitTuple);
+ } catch (IOException e) {
+ LOG.warn("fetcher initialization exception id={}",
fetchEmitTuple.getId(), e);
+ return new ParseDataOrPipesResult(null,
+ new
PipesResult(PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION,
ExceptionUtils.getStackTrace(e)));
+ }
try (TikaInputStream tis = tisOrResult.tis()) {
return parseHandler.parseWithStream(fetchEmitTuple, tis, metadata,
parseContext);
} catch (SecurityException e) {
@@ -146,8 +153,7 @@ class PipesWorker implements Callable<PipesResult> {
- private ParseContext setupParseContext(FetchEmitTuple fetchEmitTuple)
- throws TikaConfigException {
+ private ParseContext setupParseContext(FetchEmitTuple fetchEmitTuple)
throws TikaException, IOException {
ParseContext parseContext = fetchEmitTuple.getParseContext();
if (parseContext.get(HandlerConfig.class) == null) {
parseContext.set(HandlerConfig.class, DEFAULT_HANDLER_CONFIG);
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 74e6e1a78..b482200e1 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -490,9 +490,8 @@ public class PipesClientTest {
PipesResult pipesResult = pipesClient.process(tuple);
- // Should be FETCHER_NOT_FOUND
- assertEquals(PipesResult.RESULT_STATUS.FETCHER_NOT_FOUND,
pipesResult.status(),
- "Should return FETCHER_NOT_FOUND when fetcher name is
invalid");
+
assertEquals(PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION,
pipesResult.status(),
+ "Should return FETCHER_INITIALIZATION_EXCEPTION when
fetcher name is invalid");
// Verify it's categorized as APPLICATION_ERROR
assertTrue(pipesResult.isApplicationError(),
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
index 834a389be..5e8676094 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
@@ -84,4 +84,15 @@ public class PluginsTestHelper {
Files.copy(PipesServerTest.class.getResourceAsStream("/test-documents/" +
testDoc), inputDir.resolve(testDoc));
}
}
+
+ /**
+ * Converts a Path to a JSON-safe string with forward slashes.
+ * This ensures paths work correctly in JSON configs on both Windows and
Unix systems.
+ *
+ * @param path the path to convert
+ * @return a string representation with forward slashes
+ */
+ public static String toJsonPath(Path path) {
+ return path.toString().replace("\\", "/");
+ }
}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
new file mode 100644
index 000000000..8d98865c9
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.emitter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.emitter.Emitter;
+import org.apache.tika.pipes.api.emitter.EmitterNotFoundException;
+import org.apache.tika.pipes.core.PluginsTestHelper;
+import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaPluginManager;
+
+public class EmitterManagerTest {
+
+ @Test
+ public void testBasicLoad(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ assertNotNull(emitterManager);
+ assertEquals(1, emitterManager.getSupported().size());
+ assertTrue(emitterManager.getSupported().contains("fse"));
+ }
+
+ @Test
+ public void testEmptyConfig(@TempDir Path tmpDir) throws Exception {
+ // Create config with no emitters
+ String configJson = """
+ {
+ "plugin-roots": "target/plugins"
+ }
+ """;
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ assertNotNull(emitterManager);
+ assertEquals(0, emitterManager.getSupported().size());
+
+ // Try to get an emitter when none are configured
+ EmitterNotFoundException exception =
assertThrows(EmitterNotFoundException.class, () -> {
+ emitterManager.getEmitter("any-id");
+ });
+
+ assertTrue(exception.getMessage().contains("any-id"));
+ assertTrue(exception.getMessage().contains("Available: []"));
+ }
+
+ @Test
+ public void testLazyInstantiation(@TempDir Path tmpDir) throws Exception {
+ // Create config with multiple emitters
+ String configJson = String.format(Locale.ROOT, """
+ {
+ "emitters": {
+ "file-system-emitter": {
+ "fse1": {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ },
+ "fse2": {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output1")),
+ PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ // After load, both emitters should be in supported list but not
instantiated yet
+ assertEquals(2, emitterManager.getSupported().size());
+
+ // Request only fse1 - only it should be instantiated
+ Emitter emitter1 = emitterManager.getEmitter("fse1");
+ assertNotNull(emitter1);
+ assertEquals("fse1", emitter1.getExtensionConfig().id());
+
+ // fse2 has not been requested yet - verify it exists in config
+ assertTrue(emitterManager.getSupported().contains("fse2"));
+
+ // Now request fse2
+ Emitter emitter2 = emitterManager.getEmitter("fse2");
+ assertNotNull(emitter2);
+ assertEquals("fse2", emitter2.getExtensionConfig().id());
+ }
+
+ @Test
+ public void testCaching(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ // Get the same emitter multiple times
+ Emitter emitter1 = emitterManager.getEmitter("fse");
+ Emitter emitter2 = emitterManager.getEmitter("fse");
+ Emitter emitter3 = emitterManager.getEmitter("fse");
+
+ // Should be the exact same instance (reference equality)
+ assertSame(emitter1, emitter2);
+ assertSame(emitter2, emitter3);
+ }
+
+ @Test
+ public void testThreadSafety(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ int threadCount = 10;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+ List<Future<Emitter>> futures = new ArrayList<>();
+
+ // Start multiple threads that all request the same emitter
simultaneously
+ for (int i = 0; i < threadCount; i++) {
+ futures.add(executor.submit(() -> {
+ try {
+ // Wait for all threads to be ready
+ startLatch.await();
+
+ // All threads try to get the emitter at once
+ return emitterManager.getEmitter("fse");
+ } finally {
+ doneLatch.countDown();
+ }
+ }));
+ }
+
+ // Start all threads at once
+ startLatch.countDown();
+
+ // Wait for all threads to complete
+ assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
+
+ // Collect all emitters
+ List<Emitter> emitters = new ArrayList<>();
+ for (Future<Emitter> future : futures) {
+ emitters.add(future.get());
+ }
+
+ executor.shutdown();
+
+ // All threads should have gotten the same instance
+ Emitter first = emitters.get(0);
+ for (Emitter emitter : emitters) {
+ assertSame(first, emitter, "All threads should get the same
emitter instance");
+ }
+ }
+
+ @Test
+ public void testUnknownEmitterId(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ EmitterNotFoundException exception =
assertThrows(EmitterNotFoundException.class, () -> {
+ emitterManager.getEmitter("non-existent-emitter");
+ });
+
+ assertTrue(exception.getMessage().contains("non-existent-emitter"));
+ assertTrue(exception.getMessage().contains("Available:"));
+ }
+
+ @Test
+ public void testUnknownEmitterType(@TempDir Path tmpDir) throws Exception {
+ String configJson = """
+ {
+ "emitters": {
+ "non-existent-emitter-type": {
+ "emitter1": {
+ "someProp": "value"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """;
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Should fail during load (early validation)
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ EmitterManager.load(pluginManager, tikaJsonConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Unknown emitter type"));
+
assertTrue(exception.getMessage().contains("non-existent-emitter-type"));
+ }
+
+ @Test
+ public void testDuplicateEmitterId(@TempDir Path tmpDir) throws Exception {
+ String configJson = String.format(Locale.ROOT, """
+ {
+ "emitters": {
+ "file-system-emitter": {
+ "fse1": {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ },
+ "fse1": {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output1")),
+ PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ // PolymorphicObjectMapperFactory has FAIL_ON_READING_DUP_TREE_KEY
enabled
+ // so duplicate keys are caught during JSON parsing
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ TikaJsonConfig.load(configPath);
+ });
+
+ assertTrue(exception.getMessage().contains("Failed to parse JSON") &&
+ exception.getCause() != null &&
+ exception.getCause().getMessage().contains("Duplicate field"));
+ }
+
+ @Test
+ public void testGetSingleEmitter(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ // When only one emitter exists, no-arg getEmitter() should work
+ Emitter emitter = emitterManager.getEmitter();
+ assertNotNull(emitter);
+ assertEquals("fse", emitter.getExtensionConfig().id());
+ }
+
+ @Test
+ public void testGetSingleEmitterWithMultipleConfigured(@TempDir Path
tmpDir) throws Exception {
+ String configJson = String.format(Locale.ROOT, """
+ {
+ "emitters": {
+ "file-system-emitter": {
+ "fse1": {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ },
+ "fse2": {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output1")),
+ PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ // When multiple emitters exist, no-arg getEmitter() should fail
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> {
+ emitterManager.getEmitter();
+ });
+
+ assertTrue(exception.getMessage().contains("exactly 1"));
+ }
+
+ @Test
+ public void testSaveEmitter(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Load with runtime modifications enabled
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Initially only fse exists
+ assertEquals(1, emitterManager.getSupported().size());
+
+ // Dynamically add a new emitter configuration
+ String newConfigJson = String.format(Locale.ROOT, """
+ {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+ ExtensionConfig newConfig = new ExtensionConfig("fse2",
"file-system-emitter", newConfigJson);
+
+ emitterManager.saveEmitter(newConfig);
+
+ // Now both should be available
+ assertEquals(2, emitterManager.getSupported().size());
+ assertTrue(emitterManager.getSupported().contains("fse"));
+ assertTrue(emitterManager.getSupported().contains("fse2"));
+
+ // Emitter should be lazily instantiated when requested
+ Emitter emitter2 = emitterManager.getEmitter("fse2");
+ assertNotNull(emitter2);
+ assertEquals("fse2", emitter2.getExtensionConfig().id());
+ }
+
+ @Test
+ public void testSaveEmitterDuplicate(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Try to add an emitter with the same ID as existing one
+ String newConfigJson = String.format(Locale.ROOT, """
+ {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+ ExtensionConfig duplicateConfig = new ExtensionConfig("fse",
"file-system-emitter", newConfigJson);
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ emitterManager.saveEmitter(duplicateConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("already exists"));
+ assertTrue(exception.getMessage().contains("fse"));
+ }
+
+ @Test
+ public void testSaveEmitterUnknownType(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Try to add an emitter with unknown type
+ ExtensionConfig unknownTypeConfig = new ExtensionConfig("emitter2",
"unknown-emitter-type", "{}");
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ emitterManager.saveEmitter(unknownTypeConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Unknown emitter type"));
+ assertTrue(exception.getMessage().contains("unknown-emitter-type"));
+ }
+
+ @Test
+ public void testSaveEmitterNull(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig, true);
+
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> {
+ emitterManager.saveEmitter(null);
+ });
+
+ assertTrue(exception.getMessage().contains("cannot be null"));
+ }
+
+ @Test
+ public void testSaveEmitterLazyInstantiation(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Add multiple emitters
+ for (int i = 2; i <= 5; i++) {
+ String configJson = String.format(Locale.ROOT, """
+ {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output"
+ i)));
+ ExtensionConfig config2 = new ExtensionConfig("fse" + i,
"file-system-emitter", configJson);
+ emitterManager.saveEmitter(config2);
+ }
+
+ // All 5 should be in supported list
+ assertEquals(5, emitterManager.getSupported().size());
+
+ // Request only fse3 - only it should be instantiated
+ Emitter emitter3 = emitterManager.getEmitter("fse3");
+ assertNotNull(emitter3);
+ assertEquals("fse3", emitter3.getExtensionConfig().id());
+
+ // Others are still available but not instantiated yet
+ assertTrue(emitterManager.getSupported().contains("fse2"));
+ assertTrue(emitterManager.getSupported().contains("fse4"));
+ assertTrue(emitterManager.getSupported().contains("fse5"));
+ }
+
+ @Test
+ public void testSaveEmitterNotAllowed(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Load with default (runtime modifications disabled)
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
+
+ // Try to add an emitter - should fail
+ String newConfigJson = String.format(Locale.ROOT, """
+ {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+ ExtensionConfig newConfig = new ExtensionConfig("fse2",
"file-system-emitter", newConfigJson);
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ emitterManager.saveEmitter(newConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Runtime modifications are
not allowed"));
+
assertTrue(exception.getMessage().contains("allowRuntimeModifications=true"));
+ }
+
+ @Test
+ public void testSaveEmitterNotAllowedExplicit(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Load with explicit false
+ EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig, false);
+
+ // Try to add an emitter - should fail
+ String newConfigJson = String.format(Locale.ROOT, """
+ {
+ "basePath": "%s",
+ "onExists": "REPLACE"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("output2")));
+ ExtensionConfig newConfig = new ExtensionConfig("fse2",
"file-system-emitter", newConfigJson);
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ emitterManager.saveEmitter(newConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Runtime modifications are
not allowed"));
+ }
+}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
new file mode 100644
index 000000000..7e206257c
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.fetcher;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.fetcher.Fetcher;
+import org.apache.tika.pipes.api.fetcher.FetcherNotFoundException;
+import org.apache.tika.pipes.core.PluginsTestHelper;
+import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaPluginManager;
+
+public class FetcherManagerTest {
+
+ @Test
+ public void testBasicLoad(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ assertNotNull(fetcherManager);
+ assertEquals(1, fetcherManager.getSupported().size());
+ assertTrue(fetcherManager.getSupported().contains("fsf"));
+ }
+
+ @Test
+ public void testEmptyConfig(@TempDir Path tmpDir) throws Exception {
+ // Create config with no fetchers
+ String configJson = """
+ {
+ "plugin-roots": "target/plugins"
+ }
+ """;
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ assertNotNull(fetcherManager);
+ assertEquals(0, fetcherManager.getSupported().size());
+
+ // Try to get a fetcher when none are configured
+ FetcherNotFoundException exception =
assertThrows(FetcherNotFoundException.class, () -> {
+ fetcherManager.getFetcher("any-id");
+ });
+
+ assertTrue(exception.getMessage().contains("any-id"));
+ assertTrue(exception.getMessage().contains("Available: []"));
+ }
+
+ @Test
+ public void testLazyInstantiation(@TempDir Path tmpDir) throws Exception {
+ // Create config with multiple fetchers
+ String configJson = String.format(Locale.ROOT, """
+ {
+ "fetchers": {
+ "file-system-fetcher": {
+ "fsf1": {
+ "basePath": "%s"
+ },
+ "fsf2": {
+ "basePath": "%s"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path1")),
+ PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ // After load, both fetchers should be in supported list but not
instantiated yet
+ assertEquals(2, fetcherManager.getSupported().size());
+
+ // Request only fsf1 - only it should be instantiated
+ Fetcher fetcher1 = fetcherManager.getFetcher("fsf1");
+ assertNotNull(fetcher1);
+ assertEquals("fsf1", fetcher1.getExtensionConfig().id());
+
+ // fsf2 has not been requested yet - verify it exists in config
+ assertTrue(fetcherManager.getSupported().contains("fsf2"));
+
+ // Now request fsf2
+ Fetcher fetcher2 = fetcherManager.getFetcher("fsf2");
+ assertNotNull(fetcher2);
+ assertEquals("fsf2", fetcher2.getExtensionConfig().id());
+ }
+
+ @Test
+ public void testCaching(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ // Get the same fetcher multiple times
+ Fetcher fetcher1 = fetcherManager.getFetcher("fsf");
+ Fetcher fetcher2 = fetcherManager.getFetcher("fsf");
+ Fetcher fetcher3 = fetcherManager.getFetcher("fsf");
+
+ // Should be the exact same instance (reference equality)
+ assertSame(fetcher1, fetcher2);
+ assertSame(fetcher2, fetcher3);
+ }
+
+ @Test
+ public void testThreadSafety(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ int threadCount = 10;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+ List<Future<Fetcher>> futures = new ArrayList<>();
+
+ // Start multiple threads that all request the same fetcher
simultaneously
+ for (int i = 0; i < threadCount; i++) {
+ futures.add(executor.submit(() -> {
+ try {
+ // Wait for all threads to be ready
+ startLatch.await();
+
+ // All threads try to get the fetcher at once
+ return fetcherManager.getFetcher("fsf");
+ } finally {
+ doneLatch.countDown();
+ }
+ }));
+ }
+
+ // Start all threads at once
+ startLatch.countDown();
+
+ // Wait for all threads to complete
+ assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
+
+ // Collect all fetchers
+ List<Fetcher> fetchers = new ArrayList<>();
+ for (Future<Fetcher> future : futures) {
+ fetchers.add(future.get());
+ }
+
+ executor.shutdown();
+
+ // All threads should have gotten the same instance
+ Fetcher first = fetchers.get(0);
+ for (Fetcher fetcher : fetchers) {
+ assertSame(first, fetcher, "All threads should get the same
fetcher instance");
+ }
+ }
+
+ @Test
+ public void testUnknownFetcherId(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ FetcherNotFoundException exception =
assertThrows(FetcherNotFoundException.class, () -> {
+ fetcherManager.getFetcher("non-existent-fetcher");
+ });
+
+ assertTrue(exception.getMessage().contains("non-existent-fetcher"));
+ assertTrue(exception.getMessage().contains("Available:"));
+ }
+
+ @Test
+ public void testUnknownFetcherType(@TempDir Path tmpDir) throws Exception {
+ String configJson = """
+ {
+ "fetchers": {
+ "non-existent-fetcher-type": {
+ "fetcher1": {
+ "someProp": "value"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """;
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Should fail during load (early validation)
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ FetcherManager.load(pluginManager, tikaJsonConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Unknown fetcher type"));
+
assertTrue(exception.getMessage().contains("non-existent-fetcher-type"));
+ }
+
+ @Test
+ public void testDuplicateFetcherId(@TempDir Path tmpDir) throws Exception {
+ String configJson = String.format(Locale.ROOT, """
+ {
+ "fetchers": {
+ "file-system-fetcher": {
+ "fsf1": {
+ "basePath": "%s"
+ },
+ "fsf1": {
+ "basePath": "%s"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path1")),
+ PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ // PolymorphicObjectMapperFactory has FAIL_ON_READING_DUP_TREE_KEY
enabled
+ // so duplicate keys are caught during JSON parsing
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ TikaJsonConfig.load(configPath);
+ });
+
+ assertTrue(exception.getMessage().contains("Failed to parse JSON") &&
+ exception.getCause() != null &&
+ exception.getCause().getMessage().contains("Duplicate field"));
+ }
+
+ @Test
+ public void testGetSingleFetcher(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ // When only one fetcher exists, no-arg getFetcher() should work
+ Fetcher fetcher = fetcherManager.getFetcher();
+ assertNotNull(fetcher);
+ assertEquals("fsf", fetcher.getExtensionConfig().id());
+ }
+
+ @Test
+ public void testGetSingleFetcherWithMultipleConfigured(@TempDir Path
tmpDir) throws Exception {
+ String configJson = String.format(Locale.ROOT, """
+ {
+ "fetchers": {
+ "file-system-fetcher": {
+ "fsf1": {
+ "basePath": "%s"
+ },
+ "fsf2": {
+ "basePath": "%s"
+ }
+ }
+ },
+ "plugin-roots": "target/plugins"
+ }
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path1")),
+ PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+
+ Path configPath = tmpDir.resolve("config.json");
+ Files.writeString(configPath, configJson, StandardCharsets.UTF_8);
+
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ // When multiple fetchers exist, no-arg getFetcher() should fail
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> {
+ fetcherManager.getFetcher();
+ });
+
+ assertTrue(exception.getMessage().contains("requires exactly 1"));
+ }
+
+ @Test
+ public void testSaveFetcher(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Load with runtime modifications enabled
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Initially only fsf exists
+ assertEquals(1, fetcherManager.getSupported().size());
+
+ // Dynamically add a new fetcher configuration
+ String newConfigJson = String.format(Locale.ROOT, """
+ {"basePath": "%s"}
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+ ExtensionConfig newConfig = new ExtensionConfig("fsf2",
"file-system-fetcher", newConfigJson);
+
+ fetcherManager.saveFetcher(newConfig);
+
+ // Now both should be available
+ assertEquals(2, fetcherManager.getSupported().size());
+ assertTrue(fetcherManager.getSupported().contains("fsf"));
+ assertTrue(fetcherManager.getSupported().contains("fsf2"));
+
+ // Fetcher should be lazily instantiated when requested
+ Fetcher fetcher2 = fetcherManager.getFetcher("fsf2");
+ assertNotNull(fetcher2);
+ assertEquals("fsf2", fetcher2.getExtensionConfig().id());
+ }
+
+ @Test
+ public void testSaveFetcherDuplicate(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Try to add a fetcher with the same ID as existing one
+ String newConfigJson = String.format(Locale.ROOT, """
+ {"basePath": "%s"}
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+ ExtensionConfig duplicateConfig = new ExtensionConfig("fsf",
"file-system-fetcher", newConfigJson);
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ fetcherManager.saveFetcher(duplicateConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("already exists"));
+ assertTrue(exception.getMessage().contains("fsf"));
+ }
+
+ @Test
+ public void testSaveFetcherUnknownType(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Try to add a fetcher with unknown type
+ ExtensionConfig unknownTypeConfig = new ExtensionConfig("fetcher2",
"unknown-fetcher-type", "{}");
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ fetcherManager.saveFetcher(unknownTypeConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Unknown fetcher type"));
+ assertTrue(exception.getMessage().contains("unknown-fetcher-type"));
+ }
+
+ @Test
+ public void testSaveFetcherNull(@TempDir Path tmpDir) throws Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig, true);
+
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> {
+ fetcherManager.saveFetcher(null);
+ });
+
+ assertTrue(exception.getMessage().contains("cannot be null"));
+ }
+
+ @Test
+ public void testSaveFetcherLazyInstantiation(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig, true);
+
+ // Add multiple fetchers
+ for (int i = 2; i <= 5; i++) {
+ String configJson = String.format(Locale.ROOT, """
+ {"basePath": "%s"}
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path" +
i)));
+ ExtensionConfig config2 = new ExtensionConfig("fsf" + i,
"file-system-fetcher", configJson);
+ fetcherManager.saveFetcher(config2);
+ }
+
+ // All 5 should be in supported list
+ assertEquals(5, fetcherManager.getSupported().size());
+
+ // Request only fsf3 - only it should be instantiated
+ Fetcher fetcher3 = fetcherManager.getFetcher("fsf3");
+ assertNotNull(fetcher3);
+ assertEquals("fsf3", fetcher3.getExtensionConfig().id());
+
+ // Others are still available but not instantiated yet
+ assertTrue(fetcherManager.getSupported().contains("fsf2"));
+ assertTrue(fetcherManager.getSupported().contains("fsf4"));
+ assertTrue(fetcherManager.getSupported().contains("fsf5"));
+ }
+
+ @Test
+ public void testSaveFetcherNotAllowed(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Load with default (runtime modifications disabled)
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig);
+
+ // Try to add a fetcher - should fail
+ String newConfigJson = String.format(Locale.ROOT, """
+ {"basePath": "%s"}
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+ ExtensionConfig newConfig = new ExtensionConfig("fsf2",
"file-system-fetcher", newConfigJson);
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ fetcherManager.saveFetcher(newConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Runtime modifications are
not allowed"));
+
assertTrue(exception.getMessage().contains("allowRuntimeModifications=true"));
+ }
+
+ @Test
+ public void testSaveFetcherNotAllowedExplicit(@TempDir Path tmpDir) throws
Exception {
+ Path config = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(config);
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
+
+ // Load with explicit false
+ FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig, false);
+
+ // Try to add a fetcher - should fail
+ String newConfigJson = String.format(Locale.ROOT, """
+ {"basePath": "%s"}
+ """, PluginsTestHelper.toJsonPath(tmpDir.resolve("path2")));
+ ExtensionConfig newConfig = new ExtensionConfig("fsf2",
"file-system-fetcher", newConfigJson);
+
+ TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
+ fetcherManager.saveFetcher(newConfig);
+ });
+
+ assertTrue(exception.getMessage().contains("Runtime modifications are
not allowed"));
+ }
+}
diff --git
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index 65f229891..3e785c640 100644
---
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -73,9 +73,9 @@ public class FileSystemEmitter extends AbstractStreamEmitter {
checkConfig(fileSystemEmitterConfig);
}
- private void checkConfig(FileSystemEmitterConfig fileSystemEmitterConfig) {
+ private void checkConfig(FileSystemEmitterConfig fileSystemEmitterConfig)
throws TikaConfigException {
if (fileSystemEmitterConfig.onExists() == null) {
- throw new IllegalArgumentException("Must configure 'onExists' as
'skip', 'exception' or 'replace'");
+ throw new TikaConfigException("Must configure 'onExists' as
'skip', 'exception' or 'replace'");
}
}
diff --git
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
index 986f4119e..98d50cae9 100644
---
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
+++
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/test/java/org/apache/tika/pipes/reporter/jdbc/TestJDBCPipesReporter.java
@@ -151,7 +151,7 @@ public class TestJDBCPipesReporter {
Map<PipesResult.RESULT_STATUS, Long> expected = runBatch(reporter,
numThreads, numIterations);
reporter.close();
Map<PipesResult.RESULT_STATUS, Long> total =
countReported(connectionString);
- assertEquals(16, total.size());
+ assertEquals(PipesResult.RESULT_STATUS.values().length - 2,
total.size());
long sum = 0;
for (Map.Entry<PipesResult.RESULT_STATUS, Long> e :
expected.entrySet()) {
if (e.getKey() != PARSE_SUCCESS && e.getKey() !=
PARSE_SUCCESS_WITH_EXCEPTION) {