This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 7d7cdb5ecf TIKA-4651 -- refactor cli to us pipes for parser (#2586)
7d7cdb5ecf is described below
commit 7d7cdb5ecfa78a0131ce350323ba7ec0926830af
Author: Tim Allison <[email protected]>
AuthorDate: Thu Feb 5 11:01:27 2026 -0500
TIKA-4651 -- refactor cli to us pipes for parser (#2586)
---
.../pages/using-tika/java-api/getting-started.adoc | 8 +-
tika-app/pom.xml | 6 +
.../src/main/java/org/apache/tika/cli/TikaCLI.java | 147 +++++++++-
.../tika/pipes/core/config/ConfigMerger.java | 255 ++++++++++++++++
.../tika/pipes/core/config/ConfigOverrides.java | 285 ++++++++++++++++++
.../tika/pipes/core/config/ConfigMergerTest.java | 324 +++++++++++++++++++++
.../apache/tika/pipes/fork/PipesForkParser.java | 187 +++++++-----
.../tika/pipes/fork/PipesForkParserConfig.java | 25 ++
.../tika/pipes/fork/PipesForkParserTest.java | 117 ++++++++
.../apache/tika/server/core/TikaServerProcess.java | 174 +++--------
10 files changed, 1310 insertions(+), 218 deletions(-)
diff --git a/docs/modules/ROOT/pages/using-tika/java-api/getting-started.adoc
b/docs/modules/ROOT/pages/using-tika/java-api/getting-started.adoc
index 41922ec7bd..01673e8fd4 100644
--- a/docs/modules/ROOT/pages/using-tika/java-api/getting-started.adoc
+++ b/docs/modules/ROOT/pages/using-tika/java-api/getting-started.adoc
@@ -68,14 +68,14 @@ application from parser crashes, memory leaks, and infinite
loops.
[source,java]
----
-import org.apache.tika.io.TikaInputStream;
+import java.nio.file.Path;
import org.apache.tika.pipes.fork.PipesForkParser;
import org.apache.tika.pipes.fork.PipesForkResult;
-try (PipesForkParser parser = new PipesForkParser();
- TikaInputStream tis = TikaInputStream.get(filePath)) {
+Path file = Path.of("/path/to/document.pdf");
- PipesForkResult result = parser.parse(tis);
+try (PipesForkParser parser = new PipesForkParser()) {
+ PipesForkResult result = parser.parse(file);
if (result.isSuccess()) {
String content = result.getContent();
diff --git a/tika-app/pom.xml b/tika-app/pom.xml
index 0ff1485ddc..4826b036e0 100644
--- a/tika-app/pom.xml
+++ b/tika-app/pom.xml
@@ -61,6 +61,12 @@
<artifactId>tika-async-cli</artifactId>
<version>${project.version}</version>
</dependency>
+ <!-- for fork mode -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-pipes-fork-parser</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- logging -->
<dependency>
diff --git a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
index 97ca90a489..dfcc299520 100644
--- a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
+++ b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
@@ -76,6 +76,7 @@ import org.apache.tika.gui.TikaGUI;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.language.detect.LanguageHandler;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.mime.MediaType;
import org.apache.tika.mime.MediaTypeRegistry;
import org.apache.tika.mime.MimeType;
@@ -90,6 +91,10 @@ import org.apache.tika.parser.ParserDecorator;
import org.apache.tika.parser.PasswordProvider;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.parser.digestutils.CommonsDigesterFactory;
+import org.apache.tika.pipes.api.ParseMode;
+import org.apache.tika.pipes.fork.PipesForkParser;
+import org.apache.tika.pipes.fork.PipesForkParserConfig;
+import org.apache.tika.pipes.fork.PipesForkResult;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.BodyContentHandler;
import org.apache.tika.sax.ContentHandlerFactory;
@@ -204,6 +209,22 @@ public class TikaCLI {
private int maxEmbeddedCount = EmbeddedLimits.UNLIMITED;
private boolean pipeMode = true;
private boolean prettyPrint;
+ /**
+ * Fork mode: run parsing in a forked JVM process for isolation.
+ */
+ private boolean forkMode = false;
+ /**
+ * Fork mode timeout in milliseconds.
+ */
+ private long forkTimeout = 60000;
+ /**
+ * Fork mode JVM arguments.
+ */
+ private List<String> forkJvmArgs = null;
+ /**
+ * Fork mode plugins directory.
+ */
+ private String forkPluginsDir = null;
private final OutputType XML = new OutputType() {
@Override
protected ContentHandler getContentHandler(OutputStream output,
Metadata metadata) throws Exception {
@@ -474,6 +495,14 @@ public class TikaCLI {
type = LANGUAGE;
} else if (arg.equals("-d") || arg.equals("--detect")) {
type = DETECT;
+ } else if (arg.equals("-f") || arg.equals("--fork")) {
+ forkMode = true;
+ } else if (arg.startsWith("--fork-timeout=")) {
+ forkTimeout =
Long.parseLong(arg.substring("--fork-timeout=".length()));
+ } else if (arg.startsWith("--fork-jvm-args=")) {
+ forkJvmArgs =
Arrays.asList(arg.substring("--fork-jvm-args=".length()).split(","));
+ } else if (arg.startsWith("--fork-plugins-dir=")) {
+ forkPluginsDir = arg.substring("--fork-plugins-dir=".length());
} else if (arg.startsWith("--maxEmbeddedDepth=")) {
maxEmbeddedDepth =
Integer.parseInt(arg.substring("--maxEmbeddedDepth=".length()));
} else if (arg.startsWith("--maxEmbeddedCount=")) {
@@ -492,7 +521,11 @@ public class TikaCLI {
if (arg.equals("-")) {
try (TikaInputStream tis =
TikaInputStream.get(CloseShieldInputStream.wrap(System.in))) {
- type.process(tis, System.out,
Metadata.newInstance(context));
+ if (forkMode) {
+ processWithFork(tis, Metadata.newInstance(context),
System.out);
+ } else {
+ type.process(tis, System.out,
Metadata.newInstance(context));
+ }
}
} else {
URL url;
@@ -504,7 +537,12 @@ public class TikaCLI {
} else {
url = new URL(arg);
}
- if (recursiveJSON) {
+ if (forkMode) {
+ Metadata metadata = Metadata.newInstance(context);
+ try (TikaInputStream tis = TikaInputStream.get(url,
metadata)) {
+ processWithFork(tis, metadata, System.out);
+ }
+ } else if (recursiveJSON) {
handleRecursiveJson(url, System.out);
} else {
Metadata metadata = Metadata.newInstance(context);
@@ -568,6 +606,103 @@ public class TikaCLI {
}
}
+ /**
+ * Process a file using forked JVM process for isolation.
+ * This provides protection against parser crashes, OOM, and other issues.
+ */
+ private void processWithFork(TikaInputStream tis, Metadata metadata,
OutputStream output) throws Exception {
+ PipesForkParserConfig config = new PipesForkParserConfig();
+
+ // Set handler type based on output type
+ config.setContentHandlerFactory(getContentHandlerFactory(type));
+
+ // Set parse mode based on recursiveJSON flag
+ if (recursiveJSON) {
+ config.setParseMode(ParseMode.RMETA);
+ } else {
+ config.setParseMode(ParseMode.CONCATENATE);
+ }
+
+ // Set timeout
+ config.setTimeoutMillis(forkTimeout);
+
+ // Set JVM args if provided
+ if (forkJvmArgs != null && !forkJvmArgs.isEmpty()) {
+ config.setJvmArgs(forkJvmArgs);
+ }
+
+ // Set plugins directory if provided
+ if (forkPluginsDir != null) {
+ config.setPluginsDir(Paths.get(forkPluginsDir));
+ }
+
+ // Set embedded limits if configured
+ if (maxEmbeddedDepth != EmbeddedLimits.UNLIMITED || maxEmbeddedCount
!= EmbeddedLimits.UNLIMITED) {
+ EmbeddedLimits limits = new EmbeddedLimits();
+ if (maxEmbeddedDepth != EmbeddedLimits.UNLIMITED) {
+ limits.setMaxDepth(maxEmbeddedDepth);
+ }
+ if (maxEmbeddedCount != EmbeddedLimits.UNLIMITED) {
+ limits.setMaxCount(maxEmbeddedCount);
+ }
+ config.setEmbeddedLimits(limits);
+ }
+
+ try (PipesForkParser parser = new PipesForkParser(config)) {
+ PipesForkResult result = parser.parse(tis, metadata);
+
+ if (result.isProcessCrash()) {
+ LOG.error("Fork process crashed: {}", result.getStatus());
+ System.err.println("Fork process crashed: " +
result.getStatus());
+ return;
+ }
+
+ List<Metadata> metadataList = result.getMetadataList();
+
+ // Output based on type
+ if (recursiveJSON) {
+ // Output as JSON metadata list
+ JsonMetadataList.setPrettyPrinting(prettyPrint);
+ try (Writer writer = getOutputWriter(output, encoding)) {
+ JsonMetadataList.toJson(metadataList, writer);
+ }
+ } else if (type == JSON || type == METADATA) {
+ // Output metadata (first item only for single-file mode)
+ if (!metadataList.isEmpty()) {
+ Metadata m = metadataList.get(0);
+ if (type == JSON) {
+ JsonMetadata.setPrettyPrinting(prettyPrint);
+ try (Writer writer = getOutputWriter(output,
encoding)) {
+ JsonMetadata.toJson(m, writer);
+ }
+ } else {
+ try (PrintWriter writer = new
PrintWriter(getOutputWriter(output, encoding))) {
+ String[] names = m.names();
+ Arrays.sort(names);
+ for (String name : names) {
+ for (String value : m.getValues(name)) {
+ writer.println(name + ": " + value);
+ }
+ }
+ writer.flush();
+ }
+ }
+ }
+ } else {
+ // Output content (text, xml, html)
+ if (!metadataList.isEmpty()) {
+ String content =
metadataList.get(0).get(TikaCoreProperties.TIKA_CONTENT);
+ if (content != null) {
+ try (Writer writer = getOutputWriter(output,
encoding)) {
+ writer.write(content);
+ writer.flush();
+ }
+ }
+ }
+ }
+ }
+ }
+
private ContentHandlerFactory getContentHandlerFactory(OutputType type) {
BasicContentHandlerFactory.HANDLER_TYPE handlerType =
BasicContentHandlerFactory.HANDLER_TYPE.IGNORE;
if (type.equals(HTML)) {
@@ -631,6 +766,14 @@ public class TikaCLI {
out.println(" -r or --pretty-print For JSON, XML and XHTML
outputs, adds newlines and");
out.println(" whitespace, for better
readability");
out.println();
+ out.println("Fork Mode (process isolation):");
+ out.println(" -f or --fork Run parsing in a forked JVM
process for isolation");
+ out.println(" Protects against parser
crashes, OOM, and timeouts");
+ out.println(" --fork-timeout=<ms> Parse timeout in milliseconds
(default: 60000)");
+ out.println(" --fork-jvm-args=<args> JVM args for forked process
(comma-separated)");
+ out.println(" e.g.,
--fork-jvm-args=-Xmx512m,-Dsome.prop=value");
+ out.println(" --fork-plugins-dir=<dir> Directory containing plugin
zips");
+ out.println();
out.println(" --list-parsers");
out.println(" List the available document parsers");
out.println(" --list-parser-details");
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigMerger.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigMerger.java
new file mode 100644
index 0000000000..998074bab4
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigMerger.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility for merging configuration overrides with existing Tika JSON
configuration.
+ * <p>
+ * This class centralizes the config generation logic that was previously
duplicated in:
+ * <ul>
+ * <li>PipesForkParser.generateJsonConfig()</li>
+ * <li>TikaServerProcess.createDefaultConfig() and
ensureServerComponents()</li>
+ * <li>TikaAsyncCLI.PluginsWriter and ensurePluginRoots()</li>
+ * </ul>
+ * <p>
+ * Key design decisions:
+ * <ul>
+ * <li>Uses UUID-based names for internal fetchers/emitters to avoid
conflicts with
+ * user-configured components</li>
+ * <li>Returns a MergeResult containing the config path and generated names
so callers
+ * can use them</li>
+ * <li>Preserves existing config sections when merging</li>
+ * <li>Creates temp files that are marked for deletion on JVM exit</li>
+ * </ul>
+ * <p>
+ * Example usage:
+ * <pre>
+ * ConfigOverrides overrides = ConfigOverrides.builder()
+ * .addFetcher("my-fetcher", "file-system-fetcher",
+ * Map.of("basePath", "/tmp/input"))
+ * .setPipesConfig(4, 60000, null)
+ * .setEmitStrategy(EmitStrategy.PASSBACK_ALL)
+ * .build();
+ *
+ * MergeResult result = ConfigMerger.mergeOrCreate(existingConfigPath,
overrides);
+ * // Use result.configPath() for PipesParser.load()
+ * </pre>
+ */
+public class ConfigMerger {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigMerger.class);
+
+ private ConfigMerger() {
+ // Utility class
+ }
+
+ /**
+ * Merges overrides with an existing config, or creates a new config if
none exists.
+ * <p>
+ * For fetchers and emitters without explicit IDs in the overrides,
UUID-based names
+ * are generated to avoid conflicts with user-configured components.
+ *
+ * @param existingConfig path to existing config (may be null)
+ * @param overrides the overrides to apply
+ * @return MergeResult containing path to merged config and generated
fetcher ID
+ * @throws IOException if file operations fail
+ */
+ public static MergeResult mergeOrCreate(Path existingConfig,
ConfigOverrides overrides)
+ throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+ ObjectNode root;
+ if (existingConfig != null && Files.exists(existingConfig)) {
+ JsonNode parsed = mapper.readTree(existingConfig.toFile());
+ if (parsed instanceof ObjectNode) {
+ root = (ObjectNode) parsed;
+ } else {
+ root = mapper.createObjectNode();
+ }
+ LOG.debug("Merging with existing config: {}", existingConfig);
+ } else {
+ root = mapper.createObjectNode();
+ LOG.debug("Creating new config (no existing config provided)");
+ }
+
+ // Generate UUID for internal components
+ String uuid = UUID.randomUUID().toString();
+
+ // Track generated fetcher/emitter IDs
+ List<String> generatedFetcherIds = new ArrayList<>();
+ List<String> generatedEmitterIds = new ArrayList<>();
+
+ // Apply fetcher overrides
+ if (overrides.getFetchers() != null &&
!overrides.getFetchers().isEmpty()) {
+ ObjectNode fetchersNode = getOrCreateObject(mapper, root,
"fetchers");
+ for (ConfigOverrides.FetcherOverride fetcher :
overrides.getFetchers()) {
+ String fetcherId = fetcher.getId();
+ if (fetcherId == null || fetcherId.isEmpty()) {
+ fetcherId = "tika-internal-fetcher-" + uuid;
+ }
+ generatedFetcherIds.add(fetcherId);
+
+ ObjectNode fetcherNode = getOrCreateObject(mapper,
fetchersNode, fetcherId);
+ ObjectNode typeNode = getOrCreateObject(mapper, fetcherNode,
fetcher.getType());
+ applyConfigMap(typeNode, fetcher.getConfig());
+
+ LOG.debug("Added/updated fetcher: {} (type: {})", fetcherId,
fetcher.getType());
+ }
+ }
+
+ // Apply emitter overrides
+ if (overrides.getEmitters() != null &&
!overrides.getEmitters().isEmpty()) {
+ ObjectNode emittersNode = getOrCreateObject(mapper, root,
"emitters");
+ for (ConfigOverrides.EmitterOverride emitter :
overrides.getEmitters()) {
+ String emitterId = emitter.getId();
+ if (emitterId == null || emitterId.isEmpty()) {
+ emitterId = "tika-internal-emitter-" + uuid;
+ }
+ generatedEmitterIds.add(emitterId);
+
+ ObjectNode emitterNode = getOrCreateObject(mapper,
emittersNode, emitterId);
+ ObjectNode typeNode = getOrCreateObject(mapper, emitterNode,
emitter.getType());
+ applyConfigMap(typeNode, emitter.getConfig());
+
+ LOG.debug("Added/updated emitter: {} (type: {})", emitterId,
emitter.getType());
+ }
+ }
+
+ // Apply pipes config overrides
+ if (overrides.getPipesConfig() != null) {
+ ObjectNode pipesNode = getOrCreateObject(mapper, root, "pipes");
+ ConfigOverrides.PipesConfigOverride pc =
overrides.getPipesConfig();
+
+ if (pc.getNumClients() > 0) {
+ pipesNode.put("numClients", pc.getNumClients());
+ }
+ if (pc.getTimeoutMillis() > 0) {
+ pipesNode.put("timeoutMillis", pc.getTimeoutMillis());
+ }
+ if (pc.getStartupTimeoutMillis() > 0) {
+ pipesNode.put("startupTimeoutMillis",
pc.getStartupTimeoutMillis());
+ }
+ if (pc.getMaxFilesProcessedPerProcess() > 0) {
+ pipesNode.put("maxFilesProcessedPerProcess",
pc.getMaxFilesProcessedPerProcess());
+ }
+
+ // Apply forked JVM args
+ List<String> jvmArgs = pc.getForkedJvmArgs();
+ if (jvmArgs != null && !jvmArgs.isEmpty()) {
+ ArrayNode argsArray = mapper.createArrayNode();
+ for (String arg : jvmArgs) {
+ argsArray.add(arg);
+ }
+ pipesNode.set("forkedJvmArgs", argsArray);
+ }
+
+ LOG.debug("Applied pipes config: numClients={}, timeoutMillis={}",
+ pc.getNumClients(), pc.getTimeoutMillis());
+ }
+
+ // Apply emit strategy
+ if (overrides.getEmitStrategy() != null) {
+ ObjectNode pipesNode = getOrCreateObject(mapper, root, "pipes");
+ ObjectNode emitStrategyNode = getOrCreateObject(mapper, pipesNode,
"emitStrategy");
+ emitStrategyNode.put("type", overrides.getEmitStrategy().name());
+ LOG.debug("Applied emit strategy: {}",
overrides.getEmitStrategy());
+ }
+
+ // Apply plugin roots if not already set
+ if (overrides.getPluginRoots() != null && !root.has("plugin-roots")) {
+ root.put("plugin-roots", overrides.getPluginRoots());
+ LOG.debug("Set plugin-roots: {}", overrides.getPluginRoots());
+ }
+
+ // Write merged config to temp file
+ Path tempConfig = Files.createTempFile("tika-config-merged-", ".json");
+
mapper.writerWithDefaultPrettyPrinter().writeValue(tempConfig.toFile(), root);
+ tempConfig.toFile().deleteOnExit();
+
+ LOG.debug("Created merged config: {}", tempConfig);
+
+ // Return the first generated fetcher/emitter ID (or null if none)
+ String primaryFetcherId = generatedFetcherIds.isEmpty() ? null :
generatedFetcherIds.get(0);
+ String primaryEmitterId = generatedEmitterIds.isEmpty() ? null :
generatedEmitterIds.get(0);
+
+ return new MergeResult(tempConfig, primaryFetcherId, primaryEmitterId);
+ }
+
+ /**
+ * Gets or creates an ObjectNode child of the parent.
+ */
+ private static ObjectNode getOrCreateObject(ObjectMapper mapper,
ObjectNode parent, String key) {
+ if (parent.has(key) && parent.get(key).isObject()) {
+ return (ObjectNode) parent.get(key);
+ }
+ ObjectNode child = mapper.createObjectNode();
+ parent.set(key, child);
+ return child;
+ }
+
+ /**
+ * Applies a configuration map to an ObjectNode.
+ */
+ private static void applyConfigMap(ObjectNode node, Map<String, Object>
config) {
+ if (config == null) {
+ return;
+ }
+ for (Map.Entry<String, Object> entry : config.entrySet()) {
+ Object value = entry.getValue();
+ if (value instanceof String) {
+ node.put(entry.getKey(), (String) value);
+ } else if (value instanceof Boolean) {
+ node.put(entry.getKey(), (Boolean) value);
+ } else if (value instanceof Integer) {
+ node.put(entry.getKey(), (Integer) value);
+ } else if (value instanceof Long) {
+ node.put(entry.getKey(), (Long) value);
+ } else if (value instanceof Double) {
+ node.put(entry.getKey(), (Double) value);
+ } else if (value != null) {
+ node.put(entry.getKey(), value.toString());
+ }
+ }
+ }
+
+ /**
+ * Result of a config merge operation.
+ *
+ * @param configPath path to the merged configuration file
+ * @param fetcherId the primary generated fetcher ID (may be null if no
fetchers were added)
+ * @param emitterId the primary generated emitter ID (may be null if no
emitters were added)
+ */
+ public record MergeResult(Path configPath, String fetcherId, String
emitterId) {
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigOverrides.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigOverrides.java
new file mode 100644
index 0000000000..84536129f8
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigOverrides.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tika.pipes.core.EmitStrategy;
+
+/**
+ * Configuration overrides for merging with or creating Tika JSON
configuration.
+ * <p>
+ * This class provides a fluent builder API to specify fetchers, emitters,
pipes
+ * configuration, and other settings that should be merged into an existing
config
+ * or used to create a new one.
+ * <p>
+ * Example usage:
+ * <pre>
+ * ConfigOverrides overrides = ConfigOverrides.builder()
+ * .addFetcher("my-fetcher", "file-system-fetcher",
+ * Map.of("basePath", "/tmp/input"))
+ * .setPipesConfig(4, 60000, null)
+ * .setEmitStrategy(EmitStrategy.PASSBACK_ALL)
+ * .setPluginRoots("plugins")
+ * .build();
+ * </pre>
+ */
+public class ConfigOverrides {
+
+ private final List<FetcherOverride> fetchers;
+ private final List<EmitterOverride> emitters;
+ private final PipesConfigOverride pipesConfig;
+ private final String pluginRoots;
+ private final EmitStrategy emitStrategy;
+
+ private ConfigOverrides(Builder builder) {
+ this.fetchers = Collections.unmodifiableList(new
ArrayList<>(builder.fetchers));
+ this.emitters = Collections.unmodifiableList(new
ArrayList<>(builder.emitters));
+ this.pipesConfig = builder.pipesConfig;
+ this.pluginRoots = builder.pluginRoots;
+ this.emitStrategy = builder.emitStrategy;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public List<FetcherOverride> getFetchers() {
+ return fetchers;
+ }
+
+ public List<EmitterOverride> getEmitters() {
+ return emitters;
+ }
+
+ public PipesConfigOverride getPipesConfig() {
+ return pipesConfig;
+ }
+
+ public String getPluginRoots() {
+ return pluginRoots;
+ }
+
+ public EmitStrategy getEmitStrategy() {
+ return emitStrategy;
+ }
+
+ /**
+ * Represents a fetcher configuration override.
+ */
+ public static class FetcherOverride {
+ private final String id;
+ private final String type;
+ private final Map<String, Object> config;
+
+ public FetcherOverride(String id, String type, Map<String, Object>
config) {
+ this.id = id;
+ this.type = type;
+ this.config = config != null ? new HashMap<>(config) : new
HashMap<>();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+ }
+
+ /**
+ * Represents an emitter configuration override.
+ */
+ public static class EmitterOverride {
+ private final String id;
+ private final String type;
+ private final Map<String, Object> config;
+
+ public EmitterOverride(String id, String type, Map<String, Object>
config) {
+ this.id = id;
+ this.type = type;
+ this.config = config != null ? new HashMap<>(config) : new
HashMap<>();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+ }
+
+ /**
+ * Represents pipes configuration overrides.
+ */
+ public static class PipesConfigOverride {
+ private final int numClients;
+ private final long timeoutMillis;
+ private final long startupTimeoutMillis;
+ private final int maxFilesProcessedPerProcess;
+ private final List<String> forkedJvmArgs;
+
+ public PipesConfigOverride(int numClients, long timeoutMillis,
+ long startupTimeoutMillis, int
maxFilesProcessedPerProcess,
+ List<String> forkedJvmArgs) {
+ this.numClients = numClients;
+ this.timeoutMillis = timeoutMillis;
+ this.startupTimeoutMillis = startupTimeoutMillis;
+ this.maxFilesProcessedPerProcess = maxFilesProcessedPerProcess;
+ this.forkedJvmArgs = forkedJvmArgs != null ?
+ new ArrayList<>(forkedJvmArgs) : new ArrayList<>();
+ }
+
+ public int getNumClients() {
+ return numClients;
+ }
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ public long getStartupTimeoutMillis() {
+ return startupTimeoutMillis;
+ }
+
+ public int getMaxFilesProcessedPerProcess() {
+ return maxFilesProcessedPerProcess;
+ }
+
+ public List<String> getForkedJvmArgs() {
+ return forkedJvmArgs;
+ }
+ }
+
+ /**
+ * Builder for ConfigOverrides.
+ */
+ public static class Builder {
+ private final List<FetcherOverride> fetchers = new ArrayList<>();
+ private final List<EmitterOverride> emitters = new ArrayList<>();
+ private PipesConfigOverride pipesConfig;
+ private String pluginRoots;
+ private EmitStrategy emitStrategy;
+
+ private Builder() {
+ }
+
+ /**
+ * Add a fetcher configuration.
+ *
+ * @param id the fetcher ID
+ * @param type the fetcher type (e.g., "file-system-fetcher")
+ * @param config the fetcher configuration properties
+ * @return this builder
+ */
+ public Builder addFetcher(String id, String type, Map<String, Object>
config) {
+ fetchers.add(new FetcherOverride(id, type, config));
+ return this;
+ }
+
+ /**
+ * Add an emitter configuration.
+ *
+ * @param id the emitter ID
+ * @param type the emitter type (e.g., "file-system-emitter")
+ * @param config the emitter configuration properties
+ * @return this builder
+ */
+ public Builder addEmitter(String id, String type, Map<String, Object>
config) {
+ emitters.add(new EmitterOverride(id, type, config));
+ return this;
+ }
+
+ /**
+ * Set pipes configuration with basic options.
+ *
+ * @param numClients number of forked JVM clients
+ * @param timeoutMillis parse timeout in milliseconds
+ * @param forkedJvmArgs JVM arguments for forked processes (may be
null)
+ * @return this builder
+ */
+ public Builder setPipesConfig(int numClients, long timeoutMillis,
+ List<String> forkedJvmArgs) {
+ return setPipesConfig(numClients, timeoutMillis,
+
org.apache.tika.pipes.core.PipesConfig.DEFAULT_STARTUP_TIMEOUT_MILLIS,
+
org.apache.tika.pipes.core.PipesConfig.DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS,
+ forkedJvmArgs);
+ }
+
+ /**
+ * Set pipes configuration with all options.
+ *
+ * @param numClients number of forked JVM clients
+ * @param timeoutMillis parse timeout in milliseconds
+ * @param startupTimeoutMillis startup timeout in milliseconds
+ * @param maxFilesProcessedPerProcess max files before process restart
+ * @param forkedJvmArgs JVM arguments for forked processes (may be
null)
+ * @return this builder
+ */
+ public Builder setPipesConfig(int numClients, long timeoutMillis,
+ long startupTimeoutMillis, int
maxFilesProcessedPerProcess,
+ List<String> forkedJvmArgs) {
+ this.pipesConfig = new PipesConfigOverride(numClients,
timeoutMillis,
+ startupTimeoutMillis, maxFilesProcessedPerProcess,
forkedJvmArgs);
+ return this;
+ }
+
+ /**
+ * Set the plugin roots path.
+ *
+ * @param pluginRoots path to the plugins directory
+ * @return this builder
+ */
+ public Builder setPluginRoots(String pluginRoots) {
+ this.pluginRoots = pluginRoots;
+ return this;
+ }
+
+ /**
+ * Set the emit strategy.
+ *
+ * @param emitStrategy the emit strategy
+ * @return this builder
+ */
+ public Builder setEmitStrategy(EmitStrategy emitStrategy) {
+ this.emitStrategy = emitStrategy;
+ return this;
+ }
+
+ /**
+ * Build the ConfigOverrides instance.
+ *
+ * @return the ConfigOverrides
+ */
+ public ConfigOverrides build() {
+ return new ConfigOverrides(this);
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/ConfigMergerTest.java
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/ConfigMergerTest.java
new file mode 100644
index 0000000000..39be285793
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/ConfigMergerTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.pipes.core.EmitStrategy;
+
+public class ConfigMergerTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testCreateNewConfig() throws IOException {
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .addFetcher("my-fetcher", "file-system-fetcher",
+ Map.of("basePath", "/tmp/input", "allowAbsolutePaths",
true))
+ .setPipesConfig(4, 60000, null)
+ .setEmitStrategy(EmitStrategy.PASSBACK_ALL)
+ .setPluginRoots("plugins")
+ .build();
+
+ ConfigMerger.MergeResult result = ConfigMerger.mergeOrCreate(null,
overrides);
+
+ assertNotNull(result);
+ assertNotNull(result.configPath());
+ assertTrue(Files.exists(result.configPath()));
+
+ // Verify config contents
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+
+ // Check fetcher
+ assertTrue(root.has("fetchers"));
+ assertTrue(root.get("fetchers").has("my-fetcher"));
+ JsonNode fetcherConfig =
root.get("fetchers").get("my-fetcher").get("file-system-fetcher");
+ assertEquals("/tmp/input", fetcherConfig.get("basePath").asText());
+ assertTrue(fetcherConfig.get("allowAbsolutePaths").asBoolean());
+
+ // Check pipes config
+ assertTrue(root.has("pipes"));
+ assertEquals(4, root.get("pipes").get("numClients").asInt());
+ assertEquals(60000, root.get("pipes").get("timeoutMillis").asLong());
+
+ // Check emit strategy
+ assertEquals("PASSBACK_ALL",
root.get("pipes").get("emitStrategy").get("type").asText());
+
+ // Check plugin roots
+ assertEquals("plugins", root.get("plugin-roots").asText());
+
+ // Clean up
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testMergeWithExistingConfig() throws IOException {
+ // Create existing config
+ String existingConfig = """
+ {
+ "fetchers": {
+ "existing-fetcher": {
+ "file-system-fetcher": {
+ "basePath": "/existing/path"
+ }
+ }
+ },
+ "pipes": {
+ "numClients": 2
+ },
+ "plugin-roots": "existing-plugins"
+ }
+ """;
+ Path existingPath = tempDir.resolve("existing-config.json");
+ Files.writeString(existingPath, existingConfig);
+
+ // Apply overrides
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .addFetcher("new-fetcher", "file-system-fetcher",
+ Map.of("basePath", "/new/path"))
+ .setPipesConfig(8, 120000, null)
+ .build();
+
+ ConfigMerger.MergeResult result =
ConfigMerger.mergeOrCreate(existingPath, overrides);
+
+ assertNotNull(result);
+ assertTrue(Files.exists(result.configPath()));
+
+ // Verify merged config
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+
+ // Existing fetcher should be preserved
+ assertTrue(root.get("fetchers").has("existing-fetcher"));
+ assertEquals("/existing/path",
+ root.get("fetchers").get("existing-fetcher")
+ .get("file-system-fetcher").get("basePath").asText());
+
+ // New fetcher should be added
+ assertTrue(root.get("fetchers").has("new-fetcher"));
+ assertEquals("/new/path",
+ root.get("fetchers").get("new-fetcher")
+ .get("file-system-fetcher").get("basePath").asText());
+
+ // Pipes config should be overridden
+ assertEquals(8, root.get("pipes").get("numClients").asInt());
+ assertEquals(120000, root.get("pipes").get("timeoutMillis").asLong());
+
+ // Existing plugin-roots should be preserved (not overridden)
+ assertEquals("existing-plugins", root.get("plugin-roots").asText());
+
+ // Clean up
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testGeneratedUuidFetcherId() throws IOException {
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .addFetcher(null, "file-system-fetcher", // null ID triggers
UUID generation
+ Map.of("allowAbsolutePaths", true))
+ .build();
+
+ ConfigMerger.MergeResult result = ConfigMerger.mergeOrCreate(null,
overrides);
+
+ assertNotNull(result.fetcherId());
+ assertTrue(result.fetcherId().startsWith("tika-internal-fetcher-"));
+
+ // Verify fetcher exists with generated ID
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+ assertTrue(root.get("fetchers").has(result.fetcherId()));
+
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testEmitterConfig() throws IOException {
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .addEmitter("my-emitter", "file-system-emitter",
+ Map.of("basePath", "/tmp/output", "onExists",
"REPLACE"))
+ .build();
+
+ ConfigMerger.MergeResult result = ConfigMerger.mergeOrCreate(null,
overrides);
+
+ assertNotNull(result);
+ assertNull(result.fetcherId()); // No fetcher was added
+ assertEquals("my-emitter", result.emitterId()); // Should be the
explicit ID
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+
+ assertTrue(root.has("emitters"));
+ assertTrue(root.get("emitters").has("my-emitter"));
+ JsonNode emitterConfig =
root.get("emitters").get("my-emitter").get("file-system-emitter");
+ assertEquals("/tmp/output", emitterConfig.get("basePath").asText());
+ assertEquals("REPLACE", emitterConfig.get("onExists").asText());
+
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testJvmArgs() throws IOException {
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .setPipesConfig(4, 60000, List.of("-Xmx512m",
"-Dsome.prop=value"))
+ .build();
+
+ ConfigMerger.MergeResult result = ConfigMerger.mergeOrCreate(null,
overrides);
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+
+ assertTrue(root.get("pipes").has("forkedJvmArgs"));
+ JsonNode jvmArgs = root.get("pipes").get("forkedJvmArgs");
+ assertTrue(jvmArgs.isArray());
+ assertEquals(2, jvmArgs.size());
+ assertEquals("-Xmx512m", jvmArgs.get(0).asText());
+ assertEquals("-Dsome.prop=value", jvmArgs.get(1).asText());
+
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testFullPipesConfig() throws IOException {
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .setPipesConfig(8, 120000, 300000, 5000, List.of("-Xmx1g"))
+ .build();
+
+ ConfigMerger.MergeResult result = ConfigMerger.mergeOrCreate(null,
overrides);
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+
+ JsonNode pipes = root.get("pipes");
+ assertEquals(8, pipes.get("numClients").asInt());
+ assertEquals(120000, pipes.get("timeoutMillis").asLong());
+ assertEquals(300000, pipes.get("startupTimeoutMillis").asLong());
+ assertEquals(5000, pipes.get("maxFilesProcessedPerProcess").asInt());
+
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testPluginRootsNotOverriddenIfExists() throws IOException {
+ // Create config with existing plugin-roots
+ String existingConfig = """
+ {
+ "plugin-roots": "user-plugins"
+ }
+ """;
+ Path existingPath = tempDir.resolve("config-with-plugins.json");
+ Files.writeString(existingPath, existingConfig);
+
+ // Try to set different plugin-roots
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .setPluginRoots("default-plugins")
+ .build();
+
+ ConfigMerger.MergeResult result =
ConfigMerger.mergeOrCreate(existingPath, overrides);
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+
+ // Should keep the existing value, not override
+ assertEquals("user-plugins", root.get("plugin-roots").asText());
+
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testMultipleFetchers() throws IOException {
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .addFetcher("fetcher1", "file-system-fetcher",
+ Map.of("basePath", "/path1"))
+ .addFetcher("fetcher2", "file-system-fetcher",
+ Map.of("basePath", "/path2"))
+ .build();
+
+ ConfigMerger.MergeResult result = ConfigMerger.mergeOrCreate(null,
overrides);
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+
+ assertTrue(root.get("fetchers").has("fetcher1"));
+ assertTrue(root.get("fetchers").has("fetcher2"));
+ assertEquals("/path1",
+ root.get("fetchers").get("fetcher1")
+ .get("file-system-fetcher").get("basePath").asText());
+ assertEquals("/path2",
+ root.get("fetchers").get("fetcher2")
+ .get("file-system-fetcher").get("basePath").asText());
+
+ // Result should have first fetcher ID
+ assertEquals("fetcher1", result.fetcherId());
+
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testEmptyOverrides() throws IOException {
+ ConfigOverrides overrides = ConfigOverrides.builder().build();
+
+ ConfigMerger.MergeResult result = ConfigMerger.mergeOrCreate(null,
overrides);
+
+ assertNotNull(result);
+ assertTrue(Files.exists(result.configPath()));
+ assertNull(result.fetcherId());
+ assertNull(result.emitterId());
+
+ // Config should be basically empty
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode root = mapper.readTree(result.configPath().toFile());
+ assertFalse(root.has("fetchers"));
+ assertFalse(root.has("emitters"));
+ assertFalse(root.has("pipes"));
+
+ Files.deleteIfExists(result.configPath());
+ }
+
+ @Test
+ public void testNonExistentConfigPath() throws IOException {
+ Path nonExistent = tempDir.resolve("does-not-exist.json");
+
+ ConfigOverrides overrides = ConfigOverrides.builder()
+ .addFetcher("test", "file-system-fetcher", Map.of("basePath",
"/test"))
+ .build();
+
+ // Should create new config, not fail
+ ConfigMerger.MergeResult result =
ConfigMerger.mergeOrCreate(nonExistent, overrides);
+
+ assertNotNull(result);
+ assertTrue(Files.exists(result.configPath()));
+
+ Files.deleteIfExists(result.configPath());
+ }
+}
diff --git
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
index c6c3489c91..3357db35cb 100644
---
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
+++
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
@@ -18,14 +18,9 @@ package org.apache.tika.pipes.fork;
import java.io.Closeable;
import java.io.IOException;
-import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
+import java.util.Map;
import org.apache.tika.config.EmbeddedLimits;
import org.apache.tika.exception.TikaConfigException;
@@ -38,9 +33,12 @@ import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
+import org.apache.tika.pipes.core.EmitStrategy;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.PipesException;
import org.apache.tika.pipes.core.PipesParser;
+import org.apache.tika.pipes.core.config.ConfigMerger;
+import org.apache.tika.pipes.core.config.ConfigOverrides;
import org.apache.tika.sax.ContentHandlerFactory;
/**
@@ -92,18 +90,17 @@ import org.apache.tika.sax.ContentHandlerFactory;
* config.setParseMode(ParseMode.RMETA);
*
* try (PipesForkParser parser = new PipesForkParser(config)) {
- * // Parse from a file
- * try (TikaInputStream tis =
TikaInputStream.get(Paths.get("/path/to/file.pdf"))) {
- * PipesForkResult result = parser.parse(tis);
- * for (Metadata m : result.getMetadataList()) {
- * String content = m.get(TikaCoreProperties.TIKA_CONTENT);
- * // process content and metadata
- * }
+ * // Parse a file by Path
+ * Path file = Paths.get("/path/to/file.pdf");
+ * PipesForkResult result = parser.parse(file);
+ * for (Metadata m : result.getMetadataList()) {
+ * String content = m.get(TikaCoreProperties.TIKA_CONTENT);
+ * // process content and metadata
* }
*
* // Or parse from an InputStream (will be spooled to temp file)
* try (TikaInputStream tis = TikaInputStream.get(inputStream)) {
- * PipesForkResult result = parser.parse(tis);
+ * result = parser.parse(tis);
* // ...
* }
* }
@@ -118,6 +115,7 @@ public class PipesForkParser implements Closeable {
private final PipesForkParserConfig config;
private final PipesParser pipesParser;
private final Path tikaConfigPath;
+ private final String internalFetcherId;
/**
* Creates a new PipesForkParser with default configuration.
@@ -138,10 +136,63 @@ public class PipesForkParser implements Closeable {
*/
public PipesForkParser(PipesForkParserConfig config) throws IOException,
TikaConfigException {
this.config = config;
- this.tikaConfigPath = createTikaConfigFile();
+ ConfigMerger.MergeResult mergeResult = createTikaConfigFile();
+ this.tikaConfigPath = mergeResult.configPath();
+ this.internalFetcherId = mergeResult.fetcherId();
this.pipesParser = PipesParser.load(tikaConfigPath);
}
+ /**
+ * Parse a file in a forked JVM process.
+ *
+ * @param path the path to the file to parse
+ * @return the parse result containing metadata and content
+ * @throws IOException if an I/O error occurs
+ * @throws InterruptedException if the parsing is interrupted
+ * @throws PipesException if a pipes infrastructure error occurs
+ * @throws PipesForkParserException if an application error occurs
(initialization
+ * failure or configuration error)
+ */
+ public PipesForkResult parse(Path path)
+ throws IOException, InterruptedException, PipesException,
TikaException {
+ return parse(path, new Metadata(), new ParseContext());
+ }
+
+ /**
+ * Parse a file in a forked JVM process with the specified metadata.
+ *
+ * @param path the path to the file to parse
+ * @param metadata initial metadata (e.g., content type hint)
+ * @return the parse result containing metadata and content
+ * @throws IOException if an I/O error occurs
+ * @throws InterruptedException if the parsing is interrupted
+ * @throws PipesException if a pipes infrastructure error occurs
+ * @throws PipesForkParserException if an application error occurs
(initialization
+ * failure or configuration error)
+ */
+ public PipesForkResult parse(Path path, Metadata metadata)
+ throws IOException, InterruptedException, PipesException,
TikaException {
+ return parse(path, metadata, new ParseContext());
+ }
+
+ /**
+ * Parse a file in a forked JVM process with the specified metadata and
parse context.
+ *
+ * @param path the path to the file to parse
+ * @param metadata initial metadata (e.g., content type hint)
+ * @param parseContext the parse context
+ * @return the parse result containing metadata and content
+ * @throws IOException if an I/O error occurs
+ * @throws InterruptedException if the parsing is interrupted
+ * @throws PipesException if a pipes infrastructure error occurs
+ * @throws PipesForkParserException if an application error occurs
(initialization
+ * failure or configuration error)
+ */
+ public PipesForkResult parse(Path path, Metadata metadata, ParseContext
parseContext)
+ throws IOException, InterruptedException, PipesException,
TikaException {
+ return parseInternal(path, metadata, parseContext);
+ }
+
/**
* Parse a file in a forked JVM process.
*
@@ -196,15 +247,22 @@ public class PipesForkParser implements Closeable {
*/
public PipesForkResult parse(TikaInputStream tis, Metadata metadata,
ParseContext parseContext)
throws IOException, InterruptedException, PipesException,
TikaException {
-
// Get the path - this will spool to a temp file if the stream doesn't
have
// an underlying file. The temp file is managed by TikaInputStream and
will
// be cleaned up when the TikaInputStream is closed.
- Path path = tis.getPath();
+ return parseInternal(tis.getPath(), metadata, parseContext);
+ }
+
+ /**
+ * Internal parse implementation that takes a Path directly.
+ */
+ private PipesForkResult parseInternal(Path path, Metadata metadata,
ParseContext parseContext)
+ throws IOException, InterruptedException, PipesException,
TikaException {
String absolutePath = path.toAbsolutePath().toString();
String id = absolutePath;
- FetchKey fetchKey = new FetchKey(config.getFetcherName(),
absolutePath);
+ // Use the internal fetcher ID generated by ConfigMerger (UUID-based)
+ FetchKey fetchKey = new FetchKey(internalFetcherId, absolutePath);
EmitKey emitKey = new EmitKey("", id); // Empty emitter name since
we're using PASSBACK_ALL
// Add content handler factory and parse mode to parse context
@@ -307,72 +365,41 @@ public class PipesForkParser implements Closeable {
/**
* Creates a temporary tika-config.json file for the forked process.
- * This configures:
- * - FileSystemFetcher as the fetcher
- * - PASSBACK_ALL emit strategy (no emitter, return results to client)
+ * <p>
+ * Uses ConfigMerger to:
+ * - Add a FileSystemFetcher with UUID-based name for absolute path access
+ * - Set PASSBACK_ALL emit strategy (no emitter, return results to client)
+ * - Merge with user config if provided
+ *
+ * @return MergeResult containing the config path and generated fetcher ID
*/
- private Path createTikaConfigFile() throws IOException {
- Path configFile = Files.createTempFile("tika-fork-config-", ".json");
-
- String jsonConfig = generateJsonConfig();
- Files.writeString(configFile, jsonConfig);
-
- return configFile;
- }
-
- private String generateJsonConfig() throws IOException {
+ private ConfigMerger.MergeResult createTikaConfigFile() throws IOException
{
PipesConfig pc = config.getPipesConfig();
- ObjectMapper mapper = new ObjectMapper();
- mapper.enable(SerializationFeature.INDENT_OUTPUT);
-
- StringWriter writer = new StringWriter();
- try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
- gen.writeStartObject();
-
- // Fetchers section
- gen.writeObjectFieldStart("fetchers");
- gen.writeObjectFieldStart(config.getFetcherName());
- gen.writeObjectFieldStart("file-system-fetcher");
- // No basePath - fetchKey will be treated as absolute path
- // Set allowAbsolutePaths to suppress the security warning since
this is intentional
- gen.writeBooleanField("allowAbsolutePaths", true);
- gen.writeEndObject(); // file-system-fetcher
- gen.writeEndObject(); // fetcher name
- gen.writeEndObject(); // fetchers
-
- // Pipes configuration section
- gen.writeObjectFieldStart("pipes");
- gen.writeNumberField("numClients", pc.getNumClients());
- gen.writeNumberField("timeoutMillis", pc.getTimeoutMillis());
- gen.writeNumberField("startupTimeoutMillis",
pc.getStartupTimeoutMillis());
- gen.writeNumberField("maxFilesProcessedPerProcess",
pc.getMaxFilesProcessedPerProcess());
-
- // Emit strategy - PASSBACK_ALL means no emitter, return results
to client
- gen.writeObjectFieldStart("emitStrategy");
- gen.writeStringField("type", "PASSBACK_ALL");
- gen.writeEndObject(); // emitStrategy
-
- // JVM args if specified
- ArrayList<String> jvmArgs = pc.getForkedJvmArgs();
- if (jvmArgs != null && !jvmArgs.isEmpty()) {
- gen.writeArrayFieldStart("forkedJvmArgs");
- for (String arg : jvmArgs) {
- gen.writeString(arg);
- }
- gen.writeEndArray();
- }
-
- gen.writeEndObject(); // pipes
-
- // Plugin roots if specified
- if (config.getPluginsDir() != null) {
- gen.writeStringField("plugin-roots",
config.getPluginsDir().toAbsolutePath().toString());
- }
-
- gen.writeEndObject(); // root
+ // Build configuration overrides
+ ConfigOverrides.Builder builder = ConfigOverrides.builder()
+ // Add internal fetcher with UUID-based name to avoid conflicts
+ // Use null ID to trigger UUID generation
+ .addFetcher(null, "file-system-fetcher",
+ Map.of("allowAbsolutePaths", true))
+ // Set pipes configuration
+ .setPipesConfig(
+ pc.getNumClients(),
+ pc.getTimeoutMillis(),
+ pc.getStartupTimeoutMillis(),
+ pc.getMaxFilesProcessedPerProcess(),
+ pc.getForkedJvmArgs())
+ // Use PASSBACK_ALL strategy - results returned through socket
+ .setEmitStrategy(EmitStrategy.PASSBACK_ALL);
+
+ // Set plugin roots if specified
+ if (config.getPluginsDir() != null) {
+
builder.setPluginRoots(config.getPluginsDir().toAbsolutePath().toString());
}
- return writer.toString();
+ ConfigOverrides overrides = builder.build();
+
+ // Merge with user config if provided, otherwise create new
+ return ConfigMerger.mergeOrCreate(config.getUserConfigPath(),
overrides);
}
}
diff --git
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserConfig.java
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserConfig.java
index 080bdd2a9e..237e7e2d2d 100644
---
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserConfig.java
+++
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserConfig.java
@@ -40,6 +40,7 @@ public class PipesForkParserConfig {
private String fetcherName = PipesForkParser.DEFAULT_FETCHER_NAME;
private Path pluginsDir;
private EmbeddedLimits embeddedLimits;
+ private Path userConfigPath;
public PipesForkParserConfig() {
this.pipesConfig = new PipesConfig();
@@ -300,4 +301,28 @@ public class PipesForkParserConfig {
this.pluginsDir = pluginsDir;
return this;
}
+
+ /**
+ * Get the user-provided configuration file path.
+ * If set, this config will be merged with the generated configuration.
+ *
+ * @return the user config path, or null if not set
+ */
+ public Path getUserConfigPath() {
+ return userConfigPath;
+ }
+
+ /**
+ * Set a user-provided configuration file path.
+ * The user's configuration will be merged with the automatically generated
+ * configuration for PipesForkParser. User settings are preserved except
+ * for the internal fetcher which is always added.
+ *
+ * @param userConfigPath path to the user's configuration file
+ * @return this config for chaining
+ */
+ public PipesForkParserConfig setUserConfigPath(Path userConfigPath) {
+ this.userConfigPath = userConfigPath;
+ return this;
+ }
}
diff --git
a/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
index c9e49f0a4c..9d46945c5c 100644
---
a/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
+++
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
@@ -522,4 +522,121 @@ public class PipesForkParserTest {
assertEquals(1, trueCount, "Exactly one category should be true");
}
}
+
+ @Test
+ public void testParseWithPath() throws Exception {
+ // Create a simple test file
+ Path testFile = tempDir.resolve("test_path.txt");
+ String content = "Hello from path-based parsing!";
+ Files.writeString(testFile, content);
+
+ PipesForkParserConfig config = new PipesForkParserConfig()
+ .setPluginsDir(PLUGINS_DIR)
+ .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+ .setParseMode(ParseMode.RMETA)
+ .setTimeoutMillis(60000);
+
+ try (PipesForkParser parser = new PipesForkParser(config)) {
+ // Use parse(Path) directly without wrapping in TikaInputStream
+ PipesForkResult result = parser.parse(testFile);
+
+ assertTrue(result.isSuccess(), "Parse should succeed. Status: " +
result.getStatus()
+ + ", message: " + result.getMessage());
+ assertFalse(result.isProcessCrash(), "Should not be a process
crash");
+
+ List<Metadata> metadataList = result.getMetadataList();
+ assertNotNull(metadataList, "Metadata list should not be null");
+ assertFalse(metadataList.isEmpty(), "Metadata list should not be
empty");
+
+ String extractedContent = result.getContent();
+ assertNotNull(extractedContent, "Content should not be null");
+ assertTrue(extractedContent.contains("path-based parsing"),
+ "Content should contain 'path-based parsing'");
+ }
+ }
+
+ @Test
+ public void testParseWithPathAndMetadata() throws Exception {
+ // Create a simple test file
+ Path testFile = tempDir.resolve("test_path_metadata.txt");
+ Files.writeString(testFile, "Content for metadata test");
+
+ PipesForkParserConfig config = new PipesForkParserConfig()
+ .setPluginsDir(PLUGINS_DIR)
+ .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+ .setParseMode(ParseMode.RMETA)
+ .setTimeoutMillis(60000);
+
+ try (PipesForkParser parser = new PipesForkParser(config)) {
+ Metadata initialMetadata = new Metadata();
+ initialMetadata.set("custom-key", "custom-value");
+
+ // Use parse(Path, Metadata)
+ PipesForkResult result = parser.parse(testFile, initialMetadata);
+
+ assertTrue(result.isSuccess(), "Parse should succeed");
+ assertNotNull(result.getMetadata(), "Metadata should not be null");
+ assertTrue(result.getContent().contains("metadata test"));
+ }
+ }
+
+ @Test
+ public void testParseMultipleFilesWithPath() throws Exception {
+ // Create multiple test files
+ Path testFile1 = tempDir.resolve("path1.txt");
+ Path testFile2 = tempDir.resolve("path2.txt");
+ Files.writeString(testFile1, "Content of first path file");
+ Files.writeString(testFile2, "Content of second path file");
+
+ PipesForkParserConfig config = new PipesForkParserConfig()
+ .setPluginsDir(PLUGINS_DIR)
+ .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+ .setParseMode(ParseMode.RMETA)
+ .setTimeoutMillis(60000);
+
+ try (PipesForkParser parser = new PipesForkParser(config)) {
+ // Parse both files using Path directly
+ PipesForkResult result1 = parser.parse(testFile1);
+ assertTrue(result1.isSuccess());
+ assertTrue(result1.getContent().contains("first path file"));
+
+ PipesForkResult result2 = parser.parse(testFile2);
+ assertTrue(result2.isSuccess());
+ assertTrue(result2.getContent().contains("second path file"));
+ }
+ }
+
+ @Test
+ public void testParsePathMatchesTikaInputStream() throws Exception {
+ // Verify that parse(Path) produces the same result as
parse(TikaInputStream)
+ Path testFile = tempDir.resolve("compare.txt");
+ Files.writeString(testFile, "Content for comparison test");
+
+ PipesForkParserConfig config = new PipesForkParserConfig()
+ .setPluginsDir(PLUGINS_DIR)
+ .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+ .setParseMode(ParseMode.RMETA)
+ .setTimeoutMillis(60000);
+
+ // Parse with Path
+ String pathContent;
+ try (PipesForkParser parser = new PipesForkParser(config)) {
+ PipesForkResult result = parser.parse(testFile);
+ assertTrue(result.isSuccess());
+ pathContent = result.getContent();
+ }
+
+ // Parse with TikaInputStream
+ String tisContent;
+ try (PipesForkParser parser = new PipesForkParser(config);
+ TikaInputStream tis = TikaInputStream.get(testFile)) {
+ PipesForkResult result = parser.parse(tis);
+ assertTrue(result.isSuccess());
+ tisContent = result.getContent();
+ }
+
+ // Results should match
+ assertEquals(pathContent, tisContent,
+ "parse(Path) and parse(TikaInputStream) should produce same
content");
+ }
}
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index fdc8883f3b..86e7c00c3f 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -25,8 +25,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
@@ -65,6 +67,8 @@ import org.apache.tika.pipes.core.EmitStrategy;
import org.apache.tika.pipes.core.EmitStrategyConfig;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.PipesParser;
+import org.apache.tika.pipes.core.config.ConfigMerger;
+import org.apache.tika.pipes.core.config.ConfigOverrides;
import org.apache.tika.server.core.resource.AsyncResource;
import org.apache.tika.server.core.resource.DetectorResource;
import org.apache.tika.server.core.resource.LanguageResource;
@@ -484,20 +488,12 @@ public class TikaServerProcess {
LOG.info("Created unpack temp directory: {}", unpackTempDirectory);
}
- // Load or create config, adding the fetcher (and emitter if unpack is
enabled)
- Path configPath;
- if (tikaServerConfig.hasConfigFile()) {
- configPath = tikaServerConfig.getConfigPath();
- } else {
- configPath = createDefaultConfig(inputTempDirectory,
unpackTempDirectory);
- }
-
- TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
-
- // Ensure fetcher (and emitter if unpack is enabled) are configured
with correct basePaths
- configPath = ensureServerComponents(configPath, tikaJsonConfig,
+ // Create or merge config with server components using ConfigMerger
+ Path existingConfigPath = tikaServerConfig.hasConfigFile() ?
+ tikaServerConfig.getConfigPath() : null;
+ Path configPath = createServerConfig(existingConfigPath,
inputTempDirectory, unpackTempDirectory);
- tikaJsonConfig = TikaJsonConfig.load(configPath);
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
// Load or create PipesConfig with defaults
PipesConfig pipesConfig = tikaJsonConfig.deserialize("pipes",
PipesConfig.class);
@@ -559,66 +555,9 @@ public class TikaServerProcess {
private static final String DEFAULT_PLUGINS_DIR = "plugins";
/**
- * Creates a default configuration file with plugin-roots set to the
"plugins" directory
- * relative to the current working directory, the tika-server-fetcher
configured
- * with basePath pointing to the input temp directory, and optionally the
unpack-emitter
- * configured with basePath pointing to the unpack temp directory.
- *
- * @param inputTempDirectory the temp directory for input files
- * @param unpackTempDirectory the temp directory for unpack output files
(may be null)
- */
- private static Path createDefaultConfig(Path inputTempDirectory,
- Path unpackTempDirectory) throws
IOException {
- Path pluginsDir = Path.of(DEFAULT_PLUGINS_DIR).toAbsolutePath();
-
- com.fasterxml.jackson.databind.ObjectMapper mapper =
- new com.fasterxml.jackson.databind.ObjectMapper();
- com.fasterxml.jackson.databind.node.ObjectNode rootNode =
mapper.createObjectNode();
-
- // Create fetchers section
- com.fasterxml.jackson.databind.node.ObjectNode fetchersNode =
mapper.createObjectNode();
- com.fasterxml.jackson.databind.node.ObjectNode fetcherNode =
mapper.createObjectNode();
- com.fasterxml.jackson.databind.node.ObjectNode fetcherTypeConfig =
mapper.createObjectNode();
- fetcherTypeConfig.put("basePath",
inputTempDirectory.toAbsolutePath().toString());
- fetcherNode.set("file-system-fetcher", fetcherTypeConfig);
- fetchersNode.set(PipesParsingHelper.DEFAULT_FETCHER_ID, fetcherNode);
- rootNode.set("fetchers", fetchersNode);
-
- // Create emitters section if unpack is enabled
- if (unpackTempDirectory != null) {
- com.fasterxml.jackson.databind.node.ObjectNode emittersNode =
mapper.createObjectNode();
- com.fasterxml.jackson.databind.node.ObjectNode emitterNode =
mapper.createObjectNode();
- com.fasterxml.jackson.databind.node.ObjectNode emitterTypeConfig =
mapper.createObjectNode();
- emitterTypeConfig.put("basePath",
unpackTempDirectory.toAbsolutePath().toString());
- emitterTypeConfig.put("onExists", "REPLACE");
- emitterNode.set("file-system-emitter", emitterTypeConfig);
- emittersNode.set(PipesParsingHelper.UNPACK_EMITTER_ID,
emitterNode);
- rootNode.set("emitters", emittersNode);
- }
-
- // Create pipes section
- com.fasterxml.jackson.databind.node.ObjectNode pipesNode =
mapper.createObjectNode();
- pipesNode.put("numClients", 4);
- pipesNode.put("timeoutMillis", 60000);
- rootNode.set("pipes", pipesNode);
-
- // Set plugin-roots
- rootNode.put("plugin-roots", pluginsDir.toString());
-
- Path tempConfig = Files.createTempFile("tika-server-default-config-",
".json");
-
mapper.writerWithDefaultPrettyPrinter().writeValue(tempConfig.toFile(),
rootNode);
- tempConfig.toFile().deleteOnExit();
-
- LOG.info("Created default config with plugin-roots: {}", pluginsDir);
- return tempConfig;
- }
-
- /**
- * Ensures the tika-server-fetcher exists in the config with basePath
pointing to
- * the input temp directory. If unpackTempDirectory is provided, also
ensures the
- * unpack-emitter exists.
+ * Creates or merges server configuration using ConfigMerger.
* <p>
- * The fetcher is used by legacy endpoints (/tika, /rmeta, etc.) to read
uploaded files
+ * The fetcher is used by endpoints (/tika, /rmeta, etc.) to read uploaded
files
* that have been spooled to the input temp directory.
* <p>
* The emitter is used by /unpack endpoints to write unpacked files that
are then
@@ -627,81 +566,52 @@ public class TikaServerProcess {
* Both components are configured with basePath (not allowAbsolutePaths)
so child processes
* can only access files within their designated temp directories
(security boundary).
*
- * @param originalConfigPath the original config file path
- * @param tikaJsonConfig the parsed Tika JSON config
+ * @param existingConfigPath the existing config file path (may be null)
* @param inputTempDirectory the temp directory for input files
* @param unpackTempDirectory the temp directory for unpack output files
(may be null)
- * @return the config path to use (always a new merged config with fetcher
and optionally emitter)
+ * @return the config path to use
*/
- private static Path ensureServerComponents(Path originalConfigPath,
TikaJsonConfig tikaJsonConfig,
- Path inputTempDirectory,
- Path unpackTempDirectory)
throws IOException {
+ private static Path createServerConfig(Path existingConfigPath,
+ Path inputTempDirectory,
+ Path unpackTempDirectory) throws
IOException {
LOG.info("Configuring {} with basePath={}",
PipesParsingHelper.DEFAULT_FETCHER_ID, inputTempDirectory);
- // Read original config as a mutable tree
- com.fasterxml.jackson.databind.ObjectMapper mapper =
- new com.fasterxml.jackson.databind.ObjectMapper();
- com.fasterxml.jackson.databind.node.ObjectNode rootNode =
- (com.fasterxml.jackson.databind.node.ObjectNode)
mapper.readTree(originalConfigPath.toFile());
-
- // Get or create the fetchers section
- com.fasterxml.jackson.databind.node.ObjectNode fetchersNode;
- if (rootNode.has("fetchers") && rootNode.get("fetchers").isObject()) {
- fetchersNode = (com.fasterxml.jackson.databind.node.ObjectNode)
rootNode.get("fetchers");
- } else {
- fetchersNode = mapper.createObjectNode();
- rootNode.set("fetchers", fetchersNode);
+ // Build configuration overrides
+ Map<String, Object> fetcherConfig = new HashMap<>();
+ fetcherConfig.put("basePath",
inputTempDirectory.toAbsolutePath().toString());
+
+ ConfigOverrides.Builder builder = ConfigOverrides.builder()
+ // Add fetcher for reading uploaded files from temp directory
+ .addFetcher(PipesParsingHelper.DEFAULT_FETCHER_ID,
"file-system-fetcher", fetcherConfig)
+ // Use PASSBACK_ALL strategy - results returned through socket
+ .setEmitStrategy(EmitStrategy.PASSBACK_ALL)
+ // Set plugin roots
+
.setPluginRoots(Path.of(DEFAULT_PLUGINS_DIR).toAbsolutePath().toString());
+
+ // Only set default pipes config if there's no existing config
+ // This allows user-provided config to specify their own numClients,
timeoutMillis, etc.
+ if (existingConfigPath == null || !Files.exists(existingConfigPath)) {
+ builder.setPipesConfig(4, 60000, null);
}
- // Create the fetcher config with basePath
- // Structure: "tika-server-fetcher": { "file-system-fetcher": {
"basePath": "/tmp/..." } }
- com.fasterxml.jackson.databind.node.ObjectNode fetcherTypeConfig =
mapper.createObjectNode();
- fetcherTypeConfig.put("basePath",
inputTempDirectory.toAbsolutePath().toString());
-
- com.fasterxml.jackson.databind.node.ObjectNode fetcherNode =
mapper.createObjectNode();
- fetcherNode.set("file-system-fetcher", fetcherTypeConfig);
-
- fetchersNode.set(PipesParsingHelper.DEFAULT_FETCHER_ID, fetcherNode);
-
- // Only add unpack-emitter if unpack endpoint is enabled
+ // Add unpack emitter if /unpack endpoint is enabled
if (unpackTempDirectory != null) {
LOG.info("Configuring {} with basePath={}",
PipesParsingHelper.UNPACK_EMITTER_ID, unpackTempDirectory);
- // Get or create the emitters section
- com.fasterxml.jackson.databind.node.ObjectNode emittersNode;
- if (rootNode.has("emitters") &&
rootNode.get("emitters").isObject()) {
- emittersNode =
(com.fasterxml.jackson.databind.node.ObjectNode) rootNode.get("emitters");
- } else {
- emittersNode = mapper.createObjectNode();
- rootNode.set("emitters", emittersNode);
- }
+ Map<String, Object> emitterConfig = new HashMap<>();
+ emitterConfig.put("basePath",
unpackTempDirectory.toAbsolutePath().toString());
+ emitterConfig.put("onExists", "REPLACE");
- // Create the emitter config with basePath
- // Structure: "unpack-emitter": { "file-system-emitter": {
"basePath": "/tmp/...", "onExists": "REPLACE" } }
- com.fasterxml.jackson.databind.node.ObjectNode emitterTypeConfig =
mapper.createObjectNode();
- emitterTypeConfig.put("basePath",
unpackTempDirectory.toAbsolutePath().toString());
- emitterTypeConfig.put("onExists", "REPLACE");
-
- com.fasterxml.jackson.databind.node.ObjectNode emitterNode =
mapper.createObjectNode();
- emitterNode.set("file-system-emitter", emitterTypeConfig);
-
- emittersNode.set(PipesParsingHelper.UNPACK_EMITTER_ID,
emitterNode);
+ builder.addEmitter(PipesParsingHelper.UNPACK_EMITTER_ID,
"file-system-emitter", emitterConfig);
}
- // Ensure plugin-roots is set (required for child processes)
- if (!rootNode.has("plugin-roots")) {
- Path pluginsDir = Path.of(DEFAULT_PLUGINS_DIR).toAbsolutePath();
- rootNode.put("plugin-roots", pluginsDir.toString());
- LOG.info("Added default plugin-roots: {}", pluginsDir);
- }
+ ConfigOverrides overrides = builder.build();
- // Write merged config to temp file
- Path mergedConfig = Files.createTempFile("tika-server-merged-config-",
".json");
-
mapper.writerWithDefaultPrettyPrinter().writeValue(mergedConfig.toFile(),
rootNode);
- mergedConfig.toFile().deleteOnExit();
+ // Merge with existing config or create new
+ ConfigMerger.MergeResult result =
ConfigMerger.mergeOrCreate(existingConfigPath, overrides);
- LOG.debug("Created merged config: {}", mergedConfig);
- return mergedConfig;
+ LOG.debug("Created server config: {}", result.configPath());
+ return result.configPath();
}
private static class ServerDetails {