This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 25e685670 TIKA-4571 -- add a replacement for ForkParser  (#2451)
25e685670 is described below

commit 25e685670b16d6826fc260e7f3ed724f25a5bfa3
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 16 08:29:02 2025 -0500

    TIKA-4571 -- add a replacement for ForkParser  (#2451)
    
    (and fix a rat test in tika-serialization :/)
    
    Generated-by: Claude Opus 4.5 (model ID: claude-opus-4-5-20251101)
---
 tika-example/pom.xml                               |   5 +
 .../tika/example/PipesForkParserExample.java       | 491 +++++++++++++++++++++
 tika-pipes/pom.xml                                 |   1 +
 .../org/apache/tika/pipes/api/fetcher/Fetcher.java |  18 +-
 tika-pipes/tika-pipes-fork-parser/pom.xml          | 158 +++++++
 .../src/main/assembly/assembly.xml                 |  51 +++
 .../apache/tika/pipes/fork/PipesForkParser.java    | 366 +++++++++++++++
 .../tika/pipes/fork/PipesForkParserConfig.java     | 263 +++++++++++
 .../tika/pipes/fork/PipesForkParserException.java  | 102 +++++
 .../apache/tika/pipes/fork/PipesForkResult.java    | 195 ++++++++
 .../tika/pipes/fork/PipesForkParserTest.java       | 446 +++++++++++++++++++
 .../tika/pipes/fetcher/fs/FileSystemFetcher.java   |  98 ++--
 .../pipes/fetcher/fs/FileSystemFetcherConfig.java  |  17 +-
 .../fetcher/fs/FileSystemFetcherRuntimeConfig.java |  54 ---
 .../fs/FileSystemFetcherRuntimeConfigTest.java     | 184 --------
 .../pipes/fetcher/fs/FileSystemFetcherTest.java    | 115 ++++-
 tika-serialization/pom.xml                         |   9 +
 17 files changed, 2261 insertions(+), 312 deletions(-)

diff --git a/tika-example/pom.xml b/tika-example/pom.xml
index 977d56834..142a6a810 100644
--- a/tika-example/pom.xml
+++ b/tika-example/pom.xml
@@ -61,6 +61,11 @@
       <artifactId>tika-transcribe-aws</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-pipes-fork-parser</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.tika</groupId>
       <artifactId>tika-core</artifactId>
diff --git 
a/tika-example/src/main/java/org/apache/tika/example/PipesForkParserExample.java
 
b/tika-example/src/main/java/org/apache/tika/example/PipesForkParserExample.java
new file mode 100644
index 000000000..e4439b801
--- /dev/null
+++ 
b/tika-example/src/main/java/org/apache/tika/example/PipesForkParserExample.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.example;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.api.HandlerConfig;
+import org.apache.tika.pipes.core.PipesException;
+import org.apache.tika.pipes.fork.PipesForkParser;
+import org.apache.tika.pipes.fork.PipesForkParserConfig;
+import org.apache.tika.pipes.fork.PipesForkParserException;
+import org.apache.tika.pipes.fork.PipesForkResult;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+
+/**
+ * Examples of how to use the {@link PipesForkParser} to parse documents
+ * in a forked JVM process.
+ * <p>
+ * The PipesForkParser provides isolation from crashes, memory leaks, and
+ * other issues that can occur during parsing of untrusted or malformed
+ * documents. If parsing fails catastrophically (OOM, infinite loop, etc.),
+ * only the forked process is affected - your main application continues
+ * running.
+ * <p>
+ * <b>Key features:</b>
+ * <ul>
+ *   <li>Process isolation - crashes don't affect your main JVM</li>
+ *   <li>Automatic process restart after crashes</li>
+ *   <li>Configurable timeouts to prevent infinite loops</li>
+ *   <li>Memory isolation - each forked process has its own heap</li>
+ *   <li>Thread-safe - can be shared across multiple threads</li>
+ * </ul>
+ * <p>
+ * <b>IMPORTANT - Resource Management:</b>
+ * <ul>
+ *   <li>Always close both the {@link PipesForkParser} and {@link 
TikaInputStream} using
+ *       try-with-resources or explicit close() calls</li>
+ *   <li>TikaInputStream may create temporary files when parsing from streams 
- these
+ *       are only cleaned up when the stream is closed</li>
+ *   <li>PipesForkParser manages forked JVM processes - closing it terminates 
these processes
+ *       and cleans up the temporary config file</li>
+ * </ul>
+ * <p>
+ * <b>Performance Tip:</b> Tika is significantly more efficient on some file 
types
+ * (especially those requiring random access like ZIP, OLE2/Office, PDF) when 
you have
+ * a file on disk and use {@code TikaInputStream.get(Path)} instead of
+ * {@code TikaInputStream.get(Files.newInputStream(path))}. The latter will 
cause
+ * TikaInputStream to spool the entire stream to a temporary file before 
parsing,
+ * which adds overhead. If you already have a file, always use the Path-based 
method.
+ */
+public class PipesForkParserExample {
+
+    /**
+     * Basic example of parsing a file using PipesForkParser with default 
settings.
+     * <p>
+     * This is the simplest way to use PipesForkParser. It uses default 
configuration
+     * which includes:
+     * <ul>
+     *   <li>Single forked process</li>
+     *   <li>TEXT output (plain text extraction)</li>
+     *   <li>RMETA mode (separate metadata for container and each embedded 
document)</li>
+     * </ul>
+     * <p>
+     * <b>Note:</b> This example uses {@code result.getContent()} which only 
returns
+     * the container document's content. For files with embedded documents 
(ZIP, email,
+     * Office docs with attachments), embedded content is NOT included. See
+     * {@link #parseEmbeddedDocumentsRmeta(Path)} for the proper way to access 
all content
+     * including embedded documents.
+     *
+     * @param filePath the path to the file to parse
+     * @return the container document's extracted text content (embedded 
content not included)
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     * @see #parseEmbeddedDocumentsRmeta(Path) for accessing all content 
including embedded documents
+     */
+    public String parseFileBasic(Path filePath)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        try (PipesForkParser parser = new PipesForkParser();
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+            PipesForkResult result = parser.parse(tis);
+            if (result.isSuccess()) {
+                return result.getContent();
+            } else {
+                throw new TikaException("Parse failed: " + result.getStatus() +
+                        " - " + result.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Example of parsing a file and getting ALL content (container + embedded 
documents).
+     * <p>
+     * This is the recommended approach when using RMETA mode (the default) if 
you need
+     * all content from a document that may contain embedded files.
+     * <p>
+     * This method iterates over all metadata objects and concatenates their 
content,
+     * giving you content from the container AND all embedded documents.
+     *
+     * @param filePath the path to the file to parse
+     * @return all extracted text content (container + all embedded documents)
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     */
+    public String parseFileAllContent(Path filePath)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        try (PipesForkParser parser = new PipesForkParser();
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+            PipesForkResult result = parser.parse(tis);
+            if (result.isSuccess()) {
+                // Iterate over ALL metadata objects to get container + 
embedded content
+                StringBuilder allContent = new StringBuilder();
+                for (Metadata m : result.getMetadataList()) {
+                    String content = m.get(TikaCoreProperties.TIKA_CONTENT);
+                    if (content != null) {
+                        if (allContent.length() > 0) {
+                            allContent.append("\n\n");
+                        }
+                        allContent.append(content);
+                    }
+                }
+                return allContent.toString();
+            } else {
+                throw new TikaException("Parse failed: " + result.getStatus() +
+                        " - " + result.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Example of parsing from an InputStream.
+     * <p>
+     * When parsing from an InputStream (as opposed to a file), TikaInputStream
+     * will automatically spool the stream to a temporary file. This is 
necessary
+     * because the forked process needs file system access.
+     * <p>
+     * <b>Performance Note:</b> If you already have a file on disk, use
+     * {@link #parseFileBasic(Path)} with {@code TikaInputStream.get(Path)} 
instead.
+     * This avoids the overhead of spooling the stream to a temporary file.
+     * For file types that require random access (ZIP, OLE2/Office documents, 
PDF),
+     * the performance difference can be significant.
+     * <p>
+     * The temporary file is automatically cleaned up when the TikaInputStream 
is closed.
+     * <b>Always close the TikaInputStream</b> to ensure temp files are 
deleted.
+     *
+     * @param inputStream the input stream to parse
+     * @return the extracted text content
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     */
+    public String parseInputStream(InputStream inputStream)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        try (PipesForkParser parser = new PipesForkParser();
+             TikaInputStream tis = TikaInputStream.get(inputStream)) {
+            PipesForkResult result = parser.parse(tis);
+            return result.getContent();
+        }
+    }
+
+    /**
+     * Example of parsing with custom configuration.
+     * <p>
+     * This example shows how to configure:
+     * <ul>
+     *   <li>HTML output instead of plain text</li>
+     *   <li>Parse timeout of 60 seconds</li>
+     *   <li>JVM memory settings for the forked process</li>
+     *   <li>Maximum files before process restart (to prevent memory 
leaks)</li>
+     * </ul>
+     *
+     * @param filePath the path to the file to parse
+     * @return the extracted HTML content
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     */
+    public String parseWithCustomConfig(Path filePath)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.HTML)
+                .setTimeoutMillis(60000)
+                .addJvmArg("-Xmx512m")
+                .setMaxFilesPerProcess(100);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+            PipesForkResult result = parser.parse(tis);
+            return result.getContent();
+        }
+    }
+
+    /**
+     * Example of parsing with metadata extraction.
+     * <p>
+     * This example demonstrates how to access both content and metadata
+     * from the parse result.
+     *
+     * @param filePath the path to the file to parse
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     */
+    public void parseWithMetadata(Path filePath)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        try (PipesForkParser parser = new PipesForkParser();
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+            PipesForkResult result = parser.parse(tis);
+
+            if (result.isSuccess()) {
+                Metadata metadata = result.getMetadata();
+                System.out.println("Content-Type: " + 
metadata.get(Metadata.CONTENT_TYPE));
+                System.out.println("Title: " + 
metadata.get(TikaCoreProperties.TITLE));
+                System.out.println("Creator: " + 
metadata.get(TikaCoreProperties.CREATOR));
+                System.out.println("Content: " + result.getContent());
+            }
+        }
+    }
+
+    /**
+     * Example of parsing documents with embedded files using RMETA mode.
+     * <p>
+     * <b>Both RMETA and CONCATENATE modes parse embedded content.</b> The key 
differences are:
+     * <p>
+     * <b>RMETA mode (recommended for most use cases):</b>
+     * <ul>
+     *   <li>Returns separate metadata objects for the container and each 
embedded document</li>
+     *   <li>Preserves per-document metadata (author, title, dates, etc.) for 
each embedded file</li>
+     *   <li>Exceptions from embedded documents are captured in each 
document's metadata
+     *       (via {@link TikaCoreProperties#EMBEDDED_EXCEPTION}) - they are 
NOT silently swallowed</li>
+     *   <li>You can see which embedded document caused a problem</li>
+     * </ul>
+     * <p>
+     * <b>CONCATENATE mode (legacy behavior):</b>
+     * <ul>
+     *   <li>Returns a single metadata object with all content concatenated 
together</li>
+     *   <li>Embedded document metadata is lost (only container metadata is 
preserved)</li>
+     *   <li>Exceptions from embedded documents may be silently swallowed</li>
+     *   <li>Simpler output but less visibility into what happened</li>
+     * </ul>
+     *
+     * @param filePath the path to the file to parse
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     * @see #parseEmbeddedDocumentsConcatenate(Path) for the legacy 
CONCATENATE mode example
+     */
+    public void parseEmbeddedDocumentsRmeta(Path filePath)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+            PipesForkResult result = parser.parse(tis);
+
+            List<Metadata> metadataList = result.getMetadataList();
+            System.out.println("Found " + metadataList.size() + " documents");
+
+            for (int i = 0; i < metadataList.size(); i++) {
+                Metadata m = metadataList.get(i);
+                String resourceName = 
m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+                String content = m.get(TikaCoreProperties.TIKA_CONTENT);
+
+                if (i == 0) {
+                    System.out.println("Container document:");
+                } else {
+                    System.out.println("Embedded document #" + i + ": " + 
resourceName);
+                }
+                System.out.println("  Content type: " + 
m.get(Metadata.CONTENT_TYPE));
+                System.out.println("  Content length: " +
+                        (content != null ? content.length() : 0) + " chars");
+
+                // Check for exceptions that occurred while parsing this 
specific document
+                String embeddedException = 
m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
+                if (embeddedException != null) {
+                    System.out.println("  WARNING - Exception occurred: " + 
embeddedException);
+                }
+            }
+        }
+    }
+
+    /**
+     * Example of parsing documents with embedded files using CONCATENATE mode 
(legacy).
+     * <p>
+     * <b>Both RMETA and CONCATENATE modes parse embedded content.</b> 
However, CONCATENATE
+     * mode provides less visibility into the parsing process:
+     * <ul>
+     *   <li>All content from container and embedded documents is concatenated 
into one string</li>
+     *   <li>Only a single metadata object is returned (container metadata 
only)</li>
+     *   <li>Per-embedded-document metadata is lost</li>
+     *   <li>Exceptions from embedded documents may be silently swallowed</li>
+     * </ul>
+     * <p>
+     * <b>Recommendation:</b> Use RMETA mode ({@link 
#parseEmbeddedDocumentsRmeta(Path)}) unless
+     * you specifically need the legacy concatenation behavior. RMETA gives 
you visibility into
+     * embedded document exceptions and preserves metadata for each document.
+     *
+     * @param filePath the path to the file to parse
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     */
+    public void parseEmbeddedDocumentsConcatenate(Path filePath)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setParseMode(HandlerConfig.PARSE_MODE.CONCATENATE);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+            PipesForkResult result = parser.parse(tis);
+
+            // In CONCATENATE mode, there's only one metadata object
+            List<Metadata> metadataList = result.getMetadataList();
+            System.out.println("Metadata objects returned: " + 
metadataList.size()); // Always 1
+
+            Metadata m = result.getMetadata();
+            String content = result.getContent();
+
+            System.out.println("Container content type: " + 
m.get(Metadata.CONTENT_TYPE));
+            System.out.println("Total concatenated content length: " +
+                    (content != null ? content.length() : 0) + " chars");
+
+            // Note: In CONCATENATE mode, you cannot see:
+            // - Which embedded documents were processed
+            // - Metadata from individual embedded documents
+            // - Exceptions that occurred in specific embedded documents
+            // Use RMETA mode if you need this visibility
+        }
+    }
+
+    /**
+     * Example of proper error handling with PipesForkParser.
+     * <p>
+     * There are three categories of results to handle:
+     * <ol>
+     *   <li><b>Success</b> - Parsing completed successfully</li>
+     *   <li><b>Process crash</b> - The forked JVM crashed (OOM, timeout, 
etc.).
+     *       The parser will automatically restart for the next parse.</li>
+     *   <li><b>Application error</b> - Configuration or infrastructure error.
+     *       These throw {@link PipesForkParserException}.</li>
+     * </ol>
+     *
+     * @param filePath the path to the file to parse
+     * @return the extracted content, or error message if parsing failed
+     */
+    public String parseWithErrorHandling(Path filePath) {
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setTimeoutMillis(30000);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+
+            PipesForkResult result = parser.parse(tis);
+
+            if (result.isSuccess()) {
+                return result.getContent();
+            } else if (result.isProcessCrash()) {
+                // Process crashed - could be OOM, timeout, or other crash
+                // The next parse() call will automatically restart the process
+                return "Process crashed: " + result.getStatus() +
+                        ". Consider reducing memory usage or increasing 
timeout.";
+            } else {
+                // Other non-success status (e.g., fetch exception, parse 
exception)
+                return "Parse failed: " + result.getStatus() + " - " + 
result.getMessage();
+            }
+
+        } catch (PipesForkParserException e) {
+            // Application error - something is misconfigured
+            return "Application error (" + e.getStatus() + "): " + 
e.getMessage();
+        } catch (IOException | InterruptedException | TikaException | 
PipesException e) {
+            return "Error: " + e.getMessage();
+        }
+    }
+
+    /**
+     * Example of reusing PipesForkParser for multiple documents.
+     * <p>
+     * PipesForkParser is designed to be reused. Creating a new parser for each
+     * document is inefficient because it requires starting a new forked JVM 
process.
+     * <p>
+     * This example shows the recommended pattern: create the parser once and
+     * reuse it for multiple documents.
+     *
+     * @param filePaths the files to parse
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     */
+    public void parseManyFiles(List<Path> filePaths)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setTimeoutMillis(30000)
+                .setMaxFilesPerProcess(50);
+
+        try (PipesForkParser parser = new PipesForkParser(config)) {
+            for (Path filePath : filePaths) {
+                try (TikaInputStream tis = TikaInputStream.get(filePath)) {
+                    PipesForkResult result = parser.parse(tis);
+                    if (result.isSuccess()) {
+                        System.out.println("Parsed: " + filePath);
+                        System.out.println("Content type: " +
+                                
result.getMetadata().get(Metadata.CONTENT_TYPE));
+                    } else if (result.isProcessCrash()) {
+                        System.err.println("Process crashed on: " + filePath +
+                                " - " + result.getStatus());
+                        // Parser will automatically restart for next document
+                    } else {
+                        System.err.println("Failed: " + filePath +
+                                " - " + result.getMessage());
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Example of providing initial metadata hints.
+     * <p>
+     * You can provide metadata hints to the parser, such as the content type
+     * if you already know it. This can improve parsing accuracy or 
performance.
+     *
+     * @param filePath the path to the file to parse
+     * @param contentType the known content type
+     * @return the extracted content
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if parsing is interrupted
+     * @throws TikaException if a Tika error occurs
+     * @throws PipesException if a pipes infrastructure error occurs
+     */
+    public String parseWithContentTypeHint(Path filePath, String contentType)
+            throws IOException, InterruptedException, TikaException, 
PipesException {
+        try (PipesForkParser parser = new PipesForkParser();
+             TikaInputStream tis = TikaInputStream.get(filePath)) {
+
+            Metadata metadata = new Metadata();
+            metadata.set(Metadata.CONTENT_TYPE, contentType);
+
+            PipesForkResult result = parser.parse(tis, metadata);
+            return result.getContent();
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length < 1) {
+            System.err.println("Usage: PipesForkParserExample <file-path>");
+            System.exit(1);
+        }
+
+        Path filePath = Paths.get(args[0]);
+        PipesForkParserExample example = new PipesForkParserExample();
+
+        System.out.println("=== Basic Parse ===");
+        String content = example.parseFileBasic(filePath);
+        System.out.println(content);
+
+        System.out.println("\n=== Parse with Metadata ===");
+        example.parseWithMetadata(filePath);
+    }
+}
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 40ed5bbbf..e8366313d 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -36,6 +36,7 @@
     <module>tika-pipes-reporter-commons</module>
     <module>tika-pipes-iterator-commons</module>
     <module>tika-pipes-plugins</module>
+    <module>tika-pipes-fork-parser</module>
     <module>tika-async-cli</module>
     <module>tika-pipes-integration-tests</module>
   </modules>
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
index d281130f1..1e49488d9 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
@@ -35,5 +35,21 @@ import org.apache.tika.plugins.TikaExtension;
  */
 public interface Fetcher extends TikaExtension, ExtensionPoint {
 
-    TikaInputStream fetch(String fetchKey, Metadata metadata, ParseContext 
parseContext) throws TikaException, IOException;
+    /**
+     * Fetches a resource and returns it as a TikaInputStream.
+     *
+     * @param fetchKey the key identifying the resource to fetch 
(interpretation
+     *                 depends on the implementation, e.g., file path, URL, S3 
key)
+     * @param metadata metadata object to be updated with resource information
+     * @param parseContext the parse context
+     * @return a TikaInputStream for reading the resource content
+     * @throws TikaException if a Tika-specific error occurs during fetching
+     * @throws IOException if an I/O error occurs during fetching
+     * @throws SecurityException if the fetchKey attempts to access a resource
+     *         outside permitted boundaries (e.g., path traversal attack)
+     * @throws IllegalArgumentException if the fetchKey contains invalid 
characters
+     *         (e.g., null bytes)
+     */
+    TikaInputStream fetch(String fetchKey, Metadata metadata, ParseContext 
parseContext)
+            throws TikaException, IOException;
 }
diff --git a/tika-pipes/tika-pipes-fork-parser/pom.xml 
b/tika-pipes/tika-pipes-fork-parser/pom.xml
new file mode 100644
index 000000000..712aba51b
--- /dev/null
+++ b/tika-pipes/tika-pipes-fork-parser/pom.xml
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <groupId>org.apache.tika</groupId>
+    <artifactId>tika-pipes</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tika-pipes-fork-parser</artifactId>
+
+  <name>Apache Tika pipes fork parser</name>
+  <description>A ForkParser implementation backed by PipesClient for parsing 
in forked JVM processes</description>
+  <url>https://tika.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-file-system</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-parsers-standard-package</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-file-system</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>zip</type>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifestEntries>
+              
<Automatic-Module-Name>org.apache.tika.pipes.fork</Automatic-Module-Name>
+            </manifestEntries>
+          </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-plugins</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              
<outputDirectory>${project.build.directory}/plugins</outputDirectory>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.tika</groupId>
+                  <artifactId>tika-pipes-file-system</artifactId>
+                  <version>${project.version}</version>
+                  <type>zip</type>
+                  <overWrite>true</overWrite>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+              <includeScope>runtime</includeScope>
+              <stripVersion>false</stripVersion>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/assembly.xml</descriptor>
+          </descriptors>
+          <appendAssemblyId>false</appendAssemblyId>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/tika-pipes/tika-pipes-fork-parser/src/main/assembly/assembly.xml 
b/tika-pipes/tika-pipes-fork-parser/src/main/assembly/assembly.xml
new file mode 100644
index 000000000..37c48d403
--- /dev/null
+++ b/tika-pipes/tika-pipes-fork-parser/src/main/assembly/assembly.xml
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.1";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.1 
http://maven.apache.org/xsd/assembly-2.1.1.xsd";>
+  <id>bin</id>
+  <formats>
+    <format>zip</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>lib</outputDirectory>
+      <useProjectArtifact>false</useProjectArtifact>
+      <unpack>false</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.directory}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>*.jar</include>
+      </includes>
+      <excludes>
+        <exclude>*-sources.jar</exclude>
+        <exclude>*-javadoc.jar</exclude>
+      </excludes>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}/plugins</directory>
+      <outputDirectory>plugins</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
new file mode 100644
index 000000000..01296a92a
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fork;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.FetchEmitTuple;
+import org.apache.tika.pipes.api.HandlerConfig;
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.api.emitter.EmitKey;
+import org.apache.tika.pipes.api.fetcher.FetchKey;
+import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.PipesException;
+import org.apache.tika.pipes.core.PipesParser;
+
+/**
+ * A ForkParser implementation backed by {@link PipesParser}.
+ * <p>
+ * <strong>This class is intended to replace the legacy
+ * {@code org.apache.tika.fork.ForkParser}.</strong> The legacy ForkParser 
streamed
+ * SAX events between processes, which was complex and error-prone. This 
implementation
+ * uses the modern pipes infrastructure and returns parsed content in the 
metadata
+ * (via {@link org.apache.tika.metadata.TikaCoreProperties#TIKA_CONTENT}).
+ * <p>
+ * This parser runs parsing in forked JVM processes, providing isolation from
+ * crashes, memory leaks, and other issues that can occur during parsing.
+ * Multiple forked processes can be used for concurrent parsing.
+ * <p>
+ * <strong>Getting Started:</strong> This class is designed as a simple entry 
point
+ * to help users get started with forked parsing using files on the local 
filesystem.
+ * Under the hood, it uses a {@code FileSystemFetcher} to read files. For more 
advanced
+ * use cases, the Tika Pipes infrastructure supports many other sources and 
destinations
+ * through plugins:
+ * <ul>
+ *   <li><strong>Fetchers</strong> (read from): S3, Azure Blob, Google Cloud 
Storage,
+ *       HTTP, Microsoft Graph, and more</li>
+ *   <li><strong>Emitters</strong> (write to): OpenSearch, Solr, S3, 
filesystem, and more</li>
+ *   <li><strong>Pipes Iterators</strong> (batch processing): JDBC, CSV, 
filesystem crawling,
+ *       and more</li>
+ * </ul>
+ * See the {@code tika-pipes} module and its submodules for available plugins. 
For
+ * production batch processing, consider using {@code AsyncProcessor} or the
+ * {@code tika-pipes-cli} directly with a JSON configuration file.
+ * <p>
+ * <strong>Thread Safety:</strong> This class is thread-safe. Multiple threads 
can
+ * call {@link #parse} concurrently, and requests will be distributed across 
the
+ * pool of forked processes.
+ * <p>
+ * <strong>Error Handling:</strong>
+ * <ul>
+ *   <li>Application errors (initialization failures, config errors) throw
+ *       {@link PipesForkParserException}</li>
+ *   <li>Process crashes (OOM, timeout) are returned in the result - the next
+ *       parse will automatically restart the forked process</li>
+ *   <li>Per-document errors (fetch/parse exceptions) are returned in the 
result</li>
+ * </ul>
+ * <p>
+ * Example usage:
+ * <pre>
+ * PipesForkParserConfig config = new PipesForkParserConfig();
+ * config.setHandlerConfig(new HandlerConfig(HANDLER_TYPE.TEXT, 
PARSE_MODE.RMETA, -1, -1, true));
+ *
+ * try (PipesForkParser parser = new PipesForkParser(config)) {
+ *     // Parse from a file
+ *     try (TikaInputStream tis = 
TikaInputStream.get(Paths.get("/path/to/file.pdf"))) {
+ *         PipesForkResult result = parser.parse(tis);
+ *         for (Metadata m : result.getMetadataList()) {
+ *             String content = m.get(TikaCoreProperties.TIKA_CONTENT);
+ *             // process content and metadata
+ *         }
+ *     }
+ *
+ *     // Or parse from an InputStream (will be spooled to temp file)
+ *     try (TikaInputStream tis = TikaInputStream.get(inputStream)) {
+ *         PipesForkResult result = parser.parse(tis);
+ *         // ...
+ *     }
+ * }
+ * </pre>
+ *
+ * @see org.apache.tika.pipes.core.async.AsyncProcessor for batch processing
+ */
+public class PipesForkParser implements Closeable {
+
+    public static final String DEFAULT_FETCHER_NAME = "fs";
+
+    private final PipesForkParserConfig config;
+    private final PipesParser pipesParser;
+    private final Path tikaConfigPath;
+
+    /**
+     * Creates a new PipesForkParser with default configuration.
+     *
+     * @throws IOException if the temporary config file cannot be created
+     */
+    public PipesForkParser() throws IOException {
+        this(new PipesForkParserConfig());
+    }
+
+    /**
+     * Creates a new PipesForkParser with the specified configuration.
+     *
+     * @param config the configuration for this parser
+     * @throws IOException if the temporary config file cannot be created
+     */
+    public PipesForkParser(PipesForkParserConfig config) throws IOException {
+        this.config = config;
+        this.tikaConfigPath = createTikaConfigFile();
+        this.pipesParser = new PipesParser(config.getPipesConfig(), 
tikaConfigPath);
+    }
+
+    /**
+     * Parse a file in a forked JVM process.
+     *
+     * @param tis the TikaInputStream to parse. If the stream doesn't have an 
underlying
+     *            file, it will be spooled to a temporary file. The caller 
must keep
+     *            the TikaInputStream open until this method returns.
+     * @return the parse result containing metadata and content
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if the parsing is interrupted
+     * @throws PipesException if a pipes infrastructure error occurs
+     * @throws PipesForkParserException if an application error occurs 
(initialization
+     *         failure or configuration error)
+     */
+    public PipesForkResult parse(TikaInputStream tis)
+            throws IOException, InterruptedException, PipesException, 
TikaException {
+        return parse(tis, new Metadata(), new ParseContext());
+    }
+
+    /**
+     * Parse a file in a forked JVM process with the specified metadata.
+     *
+     * @param tis the TikaInputStream to parse. If the stream doesn't have an 
underlying
+     *            file, it will be spooled to a temporary file. The caller 
must keep
+     *            the TikaInputStream open until this method returns.
+     * @param metadata initial metadata (e.g., content type hint)
+     * @return the parse result containing metadata and content
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if the parsing is interrupted
+     * @throws PipesException if a pipes infrastructure error occurs
+     * @throws PipesForkParserException if an application error occurs 
(initialization
+     *         failure or configuration error)
+     */
+    public PipesForkResult parse(TikaInputStream tis, Metadata metadata)
+            throws IOException, InterruptedException, PipesException, 
TikaException {
+        return parse(tis, metadata, new ParseContext());
+    }
+
+    /**
+     * Parse a file in a forked JVM process with the specified metadata and 
parse context.
+     *
+     * @param tis the TikaInputStream to parse. If the stream doesn't have an 
underlying
+     *            file, it will be spooled to a temporary file. The caller 
must keep
+     *            the TikaInputStream open until this method returns.
+     * @param metadata initial metadata (e.g., content type hint)
+     * @param parseContext the parse context
+     * @return the parse result containing metadata and content
+     * @throws IOException if an I/O error occurs
+     * @throws InterruptedException if the parsing is interrupted
+     * @throws PipesException if a pipes infrastructure error occurs
+     * @throws PipesForkParserException if an application error occurs 
(initialization
+     *         failure or configuration error)
+     */
+    public PipesForkResult parse(TikaInputStream tis, Metadata metadata, 
ParseContext parseContext)
+            throws IOException, InterruptedException, PipesException, 
TikaException {
+
+        // Get the path - this will spool to a temp file if the stream doesn't 
have
+        // an underlying file. The temp file is managed by TikaInputStream and 
will
+        // be cleaned up when the TikaInputStream is closed.
+        Path path = tis.getPath();
+        String absolutePath = path.toAbsolutePath().toString();
+        String id = absolutePath;
+
+        FetchKey fetchKey = new FetchKey(config.getFetcherName(), 
absolutePath);
+        EmitKey emitKey = new EmitKey("", id); // Empty emitter name since 
we're using PASSBACK_ALL
+
+        // Add handler config to parse context so server knows how to handle 
content
+        parseContext.set(HandlerConfig.class, config.getHandlerConfig());
+
+        FetchEmitTuple tuple = new FetchEmitTuple(id, fetchKey, emitKey, 
metadata, parseContext);
+
+        PipesResult result = pipesParser.parse(tuple);
+
+        // Check for application errors and throw if necessary
+        // Process crashes are NOT thrown - the next parse will restart the 
process
+        checkForApplicationError(result);
+
+        return new PipesForkResult(result);
+    }
+
+    /**
+     * Checks if the result represents an application error and throws an 
exception if so.
+     * <p>
+     * Application errors that cause exceptions:
+     * <ul>
+     *   <li>Initialization failures (parser, fetcher, or emitter)</li>
+     *   <li>Configuration errors (fetcher or emitter not found)</li>
+     *   <li>Client unavailable within timeout</li>
+     * </ul>
+     * <p>
+     * Process crashes (OOM, timeout, unspecified crash) are NOT thrown as 
exceptions.
+     * The forked process will be automatically restarted on the next parse 
call.
+     * Check {@link PipesForkResult#isProcessCrash()} to detect these cases.
+     * <p>
+     * Per-document errors (fetch exception, parse exception) are also NOT 
thrown.
+     * These are returned in the result so the caller can handle them 
appropriately
+     * (e.g., log and continue with the next file).
+     *
+     * @param result the pipes result to check
+     * @throws PipesForkParserException if the result represents an 
application error
+     */
+    private void checkForApplicationError(PipesResult result) throws 
PipesForkParserException {
+        PipesResult.RESULT_STATUS status = result.status();
+
+        // Only throw for application errors that indicate 
infrastructure/config problems
+        // Process crashes and per-document errors are returned to the caller
+        switch (status) {
+            case FAILED_TO_INITIALIZE:
+                throw new PipesForkParserException(status,
+                        "Failed to initialize parser" +
+                        (result.message() != null ? ": " + result.message() : 
""));
+
+            case FETCHER_INITIALIZATION_EXCEPTION:
+                throw new PipesForkParserException(status,
+                        "Failed to initialize fetcher" +
+                        (result.message() != null ? ": " + result.message() : 
""));
+
+            case EMITTER_INITIALIZATION_EXCEPTION:
+                throw new PipesForkParserException(status,
+                        "Failed to initialize emitter" +
+                        (result.message() != null ? ": " + result.message() : 
""));
+
+            case FETCHER_NOT_FOUND:
+                throw new PipesForkParserException(status,
+                        "Fetcher not found" +
+                        (result.message() != null ? ": " + result.message() : 
""));
+
+            case EMITTER_NOT_FOUND:
+                throw new PipesForkParserException(status,
+                        "Emitter not found" +
+                        (result.message() != null ? ": " + result.message() : 
""));
+
+            case CLIENT_UNAVAILABLE_WITHIN_MS:
+                throw new PipesForkParserException(status,
+                        "No client available within timeout" +
+                        (result.message() != null ? ": " + result.message() : 
""));
+
+            default:
+                // Process crashes (OOM, TIMEOUT, UNSPECIFIED_CRASH) - not 
thrown,
+                // next parse will restart the process automatically
+                //
+                // Per-document errors (FETCH_EXCEPTION, 
PARSE_EXCEPTION_NO_EMIT, etc.) -
+                // not thrown, caller can check result and decide how to handle
+                //
+                // Success states - obviously not thrown
+                break;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        pipesParser.close();
+        // Clean up temp config file
+        if (tikaConfigPath != null) {
+            Files.deleteIfExists(tikaConfigPath);
+        }
+    }
+
+    /**
+     * Creates a temporary tika-config.json file for the forked process.
+     * This configures:
+     * - FileSystemFetcher as the fetcher
+     * - PASSBACK_ALL emit strategy (no emitter, return results to client)
+     */
+    private Path createTikaConfigFile() throws IOException {
+        Path configFile = Files.createTempFile("tika-fork-config-", ".json");
+
+        String jsonConfig = generateJsonConfig();
+        Files.writeString(configFile, jsonConfig);
+
+        return configFile;
+    }
+
+    private String generateJsonConfig() throws IOException {
+        PipesConfig pc = config.getPipesConfig();
+
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+        StringWriter writer = new StringWriter();
+        try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
+            gen.writeStartObject();
+
+            // Fetchers section
+            gen.writeObjectFieldStart("fetchers");
+            gen.writeObjectFieldStart(config.getFetcherName());
+            gen.writeObjectFieldStart("file-system-fetcher");
+            // No basePath - fetchKey will be treated as absolute path
+            // Set allowAbsolutePaths to suppress the security warning since 
this is intentional
+            gen.writeBooleanField("allowAbsolutePaths", true);
+            gen.writeEndObject(); // file-system-fetcher
+            gen.writeEndObject(); // fetcher name
+            gen.writeEndObject(); // fetchers
+
+            // Pipes configuration section
+            gen.writeObjectFieldStart("pipes");
+            gen.writeNumberField("numClients", pc.getNumClients());
+            gen.writeNumberField("timeoutMillis", pc.getTimeoutMillis());
+            gen.writeNumberField("startupTimeoutMillis", 
pc.getStartupTimeoutMillis());
+            gen.writeNumberField("maxFilesProcessedPerProcess", 
pc.getMaxFilesProcessedPerProcess());
+
+            // Emit strategy - PASSBACK_ALL means no emitter, return results 
to client
+            gen.writeObjectFieldStart("emitStrategy");
+            gen.writeStringField("type", "PASSBACK_ALL");
+            gen.writeEndObject(); // emitStrategy
+
+            // JVM args if specified
+            ArrayList<String> jvmArgs = pc.getForkedJvmArgs();
+            if (jvmArgs != null && !jvmArgs.isEmpty()) {
+                gen.writeArrayFieldStart("forkedJvmArgs");
+                for (String arg : jvmArgs) {
+                    gen.writeString(arg);
+                }
+                gen.writeEndArray();
+            }
+
+            gen.writeEndObject(); // pipes
+
+            // Plugin roots if specified
+            if (config.getPluginsDir() != null) {
+                gen.writeStringField("plugin-roots", 
config.getPluginsDir().toAbsolutePath().toString());
+            }
+
+            gen.writeEndObject(); // root
+        }
+
+        return writer.toString();
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserConfig.java
 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserConfig.java
new file mode 100644
index 000000000..8ffa0b555
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserConfig.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fork;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tika.pipes.api.HandlerConfig;
+import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+
+/**
+ * Configuration for {@link PipesForkParser}.
+ * <p>
+ * This provides a simplified configuration API that abstracts away the
+ * complexity of the pipes infrastructure.
+ */
+public class PipesForkParserConfig {
+
+    private final PipesConfig pipesConfig;
+    private HandlerConfig handlerConfig;
+    private String fetcherName = PipesForkParser.DEFAULT_FETCHER_NAME;
+    private Path pluginsDir;
+
+    public PipesForkParserConfig() {
+        this.pipesConfig = new PipesConfig();
+        this.handlerConfig = new HandlerConfig();
+        // Default to single client for simple fork parser use case
+        this.pipesConfig.setNumClients(1);
+    }
+
+    /**
+     * Get the underlying PipesConfig for advanced configuration.
+     *
+     * @return the pipes configuration
+     */
+    public PipesConfig getPipesConfig() {
+        return pipesConfig;
+    }
+
+    /**
+     * Get the handler configuration that specifies how content should be 
handled.
+     *
+     * @return the handler configuration
+     */
+    public HandlerConfig getHandlerConfig() {
+        return handlerConfig;
+    }
+
+    /**
+     * Set the handler configuration.
+     *
+     * @param handlerConfig the handler configuration
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setHandlerConfig(HandlerConfig handlerConfig) 
{
+        this.handlerConfig = handlerConfig;
+        return this;
+    }
+
+    /**
+     * Set the handler type (TEXT, HTML, XML, etc.).
+     *
+     * @param type the handler type
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig 
setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE type) {
+        this.handlerConfig.setType(type);
+        return this;
+    }
+
+    /**
+     * Set the parse mode (RMETA for recursive metadata, CONCATENATE for 
single document).
+     *
+     * @param parseMode the parse mode
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setParseMode(HandlerConfig.PARSE_MODE 
parseMode) {
+        this.handlerConfig.setParseMode(parseMode);
+        return this;
+    }
+
+    /**
+     * Set the write limit for content extraction.
+     *
+     * @param writeLimit the maximum characters to extract (-1 for unlimited)
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setWriteLimit(int writeLimit) {
+        this.handlerConfig.setWriteLimit(writeLimit);
+        return this;
+    }
+
+    /**
+     * Set the maximum number of embedded resources to process.
+     *
+     * @param maxEmbeddedResources the maximum embedded resources (-1 for 
unlimited)
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setMaxEmbeddedResources(int 
maxEmbeddedResources) {
+        this.handlerConfig.setMaxEmbeddedResources(maxEmbeddedResources);
+        return this;
+    }
+
+    /**
+     * Get the fetcher name used for file system fetching.
+     *
+     * @return the fetcher name
+     */
+    public String getFetcherName() {
+        return fetcherName;
+    }
+
+    /**
+     * Set the fetcher name.
+     *
+     * @param fetcherName the fetcher name
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setFetcherName(String fetcherName) {
+        this.fetcherName = fetcherName;
+        return this;
+    }
+
+    /**
+     * Set the timeout in milliseconds for parsing operations.
+     *
+     * @param timeoutMillis the timeout in milliseconds
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setTimeoutMillis(long timeoutMillis) {
+        pipesConfig.setTimeoutMillis(timeoutMillis);
+        return this;
+    }
+
+    /**
+     * Set the JVM arguments for the forked process.
+     *
+     * @param jvmArgs the JVM arguments (e.g., "-Xmx512m")
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setJvmArgs(List<String> jvmArgs) {
+        pipesConfig.setForkedJvmArgs(new ArrayList<>(jvmArgs));
+        return this;
+    }
+
+    /**
+     * Add a JVM argument for the forked process.
+     *
+     * @param arg the JVM argument to add
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig addJvmArg(String arg) {
+        pipesConfig.getForkedJvmArgs().add(arg);
+        return this;
+    }
+
+    /**
+     * Set the Java executable path.
+     *
+     * @param javaPath path to the java executable
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setJavaPath(String javaPath) {
+        pipesConfig.setJavaPath(javaPath);
+        return this;
+    }
+
+    /**
+     * Set the maximum number of files to process before restarting the forked 
process.
+     * This helps prevent memory leaks from accumulating.
+     *
+     * @param maxFiles the maximum files per process (-1 for unlimited)
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setMaxFilesPerProcess(int maxFiles) {
+        pipesConfig.setMaxFilesProcessedPerProcess(maxFiles);
+        return this;
+    }
+
+    /**
+     * <b>EXPERT:</b> Set the number of forked JVM processes (clients) to use 
for parsing.
+     * <p>
+     * This enables concurrent parsing across multiple forked processes. Each 
client
+     * is an independent JVM that can parse documents in parallel. When 
multiple threads
+     * call {@link PipesForkParser#parse}, requests are distributed across the 
pool
+     * of forked processes.
+     * <p>
+     * <b>When to use:</b> Set this higher than 1 when you need to parse many 
documents
+     * concurrently and have sufficient CPU cores and memory. Each forked 
process
+     * consumes memory independently (based on your JVM args like -Xmx).
+     * <p>
+     * <b>Default:</b> 1 (single forked process, suitable for simple 
sequential use)
+     *
+     * @param numClients the number of forked JVM processes (must be &gt;= 1)
+     * @return this config for chaining
+     * @throws IllegalArgumentException if numClients is less than 1
+     */
+    public PipesForkParserConfig setNumClients(int numClients) {
+        if (numClients < 1) {
+            throw new IllegalArgumentException("numClients must be >= 1");
+        }
+        pipesConfig.setNumClients(numClients);
+        return this;
+    }
+
+    /**
+     * Get the number of forked JVM processes configured.
+     *
+     * @return the number of clients
+     */
+    public int getNumClients() {
+        return pipesConfig.getNumClients();
+    }
+
+    /**
+     * Set the startup timeout in milliseconds.
+     *
+     * @param startupTimeoutMillis the startup timeout
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setStartupTimeoutMillis(long 
startupTimeoutMillis) {
+        pipesConfig.setStartupTimeoutMillis(startupTimeoutMillis);
+        return this;
+    }
+
+    /**
+     * Get the plugins directory.
+     *
+     * @return the plugins directory, or null if not set
+     */
+    public Path getPluginsDir() {
+        return pluginsDir;
+    }
+
+    /**
+     * Set the plugins directory where plugin zips are located.
+     * This directory should contain the tika-pipes-file-system zip
+     * and any other required plugins.
+     *
+     * @param pluginsDir the plugins directory
+     * @return this config for chaining
+     */
+    public PipesForkParserConfig setPluginsDir(Path pluginsDir) {
+        this.pluginsDir = pluginsDir;
+        return this;
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserException.java
 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserException.java
new file mode 100644
index 000000000..5f32c5421
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParserException.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fork;
+
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.api.PipesResult;
+
+/**
+ * Exception thrown when {@link PipesForkParser} encounters an application 
error.
+ * <p>
+ * This exception is thrown for application-level errors that indicate
+ * infrastructure or configuration problems:
+ * <ul>
+ *   <li>Initialization failures (parser, fetcher, or emitter 
initialization)</li>
+ *   <li>Configuration errors (fetcher or emitter not found)</li>
+ *   <li>Client unavailable (no forked process available within timeout)</li>
+ * </ul>
+ * <p>
+ * The following are NOT thrown as exceptions:
+ * <ul>
+ *   <li>Process crashes (OOM, timeout) - returned in result, next parse
+ *       will automatically restart the forked process</li>
+ *   <li>Per-document failures (fetch exception, parse exception) - returned
+ *       in result so caller can handle gracefully</li>
+ * </ul>
+ *
+ * @see PipesForkResult#isProcessCrash()
+ * @see PipesForkResult#isApplicationError()
+ */
+public class PipesForkParserException extends TikaException {
+
+    private final PipesResult.RESULT_STATUS status;
+
+    /**
+     * Creates a new exception with the given status and message.
+     *
+     * @param status the result status that caused this exception
+     * @param message the error message
+     */
+    public PipesForkParserException(
+            PipesResult.RESULT_STATUS status, String message) {
+        super(message);
+        this.status = status;
+    }
+
+    /**
+     * Creates a new exception with the given status, message, and cause.
+     *
+     * @param status the result status that caused this exception
+     * @param message the error message
+     * @param cause the underlying cause
+     */
+    public PipesForkParserException(
+            PipesResult.RESULT_STATUS status, String message, Throwable cause) 
{
+        super(message, cause);
+        this.status = status;
+    }
+
+    /**
+     * Get the result status that caused this exception.
+     *
+     * @return the result status
+     */
+    public PipesResult.RESULT_STATUS getStatus() {
+        return status;
+    }
+
+    /**
+     * Check if this exception was caused by an initialization failure.
+     *
+     * @return true if initialization failed
+     */
+    public boolean isInitializationFailure() {
+        return status == PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE
+                || status == 
PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION
+                || status == 
PipesResult.RESULT_STATUS.EMITTER_INITIALIZATION_EXCEPTION;
+    }
+
+    /**
+     * Check if this exception was caused by a configuration error.
+     *
+     * @return true if there was a configuration error
+     */
+    public boolean isConfigurationError() {
+        return status == PipesResult.RESULT_STATUS.FETCHER_NOT_FOUND
+                || status == PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND;
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
new file mode 100644
index 000000000..e72269f34
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fork;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.api.PipesResult;
+
+/**
+ * Result from parsing a file with {@link PipesForkParser}.
+ * <p>
+ * This wraps the {@link PipesResult} and provides convenient access to
+ * the parsed content and metadata.
+ * <p>
+ * Content is available in the metadata via {@link 
TikaCoreProperties#TIKA_CONTENT}.
+ * <p>
+ * <b>Important - Accessing Results:</b>
+ * <ul>
+ *   <li><b>RMETA mode (default):</b> Use {@link #getMetadataList()} to access 
content and
+ *       metadata from the container document AND all embedded documents. The 
convenience
+ *       methods {@link #getContent()} and {@link #getMetadata()} only return 
the container
+ *       document's data - embedded document content will be missed!</li>
+ *   <li><b>CONCATENATE mode:</b> Include only metadata from the container 
document, but
+ *       concatenated content from the container document and all 
attachments.</li>
+ * </ul>
+ */
+public class PipesForkResult {
+
+    private final PipesResult pipesResult;
+
+    public PipesForkResult(PipesResult pipesResult) {
+        this.pipesResult = pipesResult;
+    }
+
+    /**
+     * Get the result status.
+     *
+     * @return the result status
+     */
+    public PipesResult.RESULT_STATUS getStatus() {
+        return pipesResult.status();
+    }
+
+    /**
+     * Check if the parsing was successful.
+     *
+     * @return true if parsing succeeded
+     */
+    public boolean isSuccess() {
+        return pipesResult.isSuccess();
+    }
+
+    /**
+     * Check if there was a process crash (OOM, timeout, etc.).
+     *
+     * @return true if the forked process crashed
+     */
+    public boolean isProcessCrash() {
+        return pipesResult.isProcessCrash();
+    }
+
+    /**
+     * Check if there was an application error.
+     *
+     * @return true if there was an application-level error
+     */
+    public boolean isApplicationError() {
+        return pipesResult.isApplicationError();
+    }
+
+    /**
+     * Get the list of metadata objects from parsing.
+     * <p>
+     * <b>This is the recommended method for RMETA mode (the default).</b>
+     * <p>
+     * <b>RMETA mode:</b> Returns one metadata object per document - the first 
is
+     * the container document, followed by each embedded document. Each 
metadata
+     * object contains:
+     * <ul>
+     *   <li>Content via {@link TikaCoreProperties#TIKA_CONTENT}</li>
+     *   <li>Document metadata (title, author, dates, etc.)</li>
+     *   <li>Any parse exceptions via {@link 
TikaCoreProperties#EMBEDDED_EXCEPTION}</li>
+     * </ul>
+     * <p>
+     * <b>CONCATENATE mode:</b> Returns a single metadata object containing the
+     * container's metadata and concatenated content from all documents.
+     *
+     * @return the list of metadata objects, or empty list if none
+     */
+    public List<Metadata> getMetadataList() {
+        if (pipesResult.emitData() == null) {
+            return Collections.emptyList();
+        }
+        return pipesResult.emitData().getMetadataList();
+    }
+
+    /**
+     * Get the content from the container document only.
+     * <p>
+     * <b>WARNING - RMETA mode:</b> In RMETA mode, this returns ONLY the 
container
+     * document's content. Content from embedded documents is NOT included. To 
get
+     * all content including embedded documents, iterate over {@link 
#getMetadataList()}
+     * and retrieve {@link TikaCoreProperties#TIKA_CONTENT} from each metadata 
object.
+     * <p>
+     * <b>CONCATENATE mode:</b> In CONCATENATE mode, this returns all content
+     * (container + embedded) since everything is concatenated into a single
+     * metadata object. This method works as expected in CONCATENATE mode.
+     * <p>
+     * <b>Recommendation:</b> For RMETA mode (the default), use {@link 
#getMetadataList()}
+     * to access content from all documents. This method is most appropriate 
for
+     * CONCATENATE mode or when you only need the container document's content.
+     *
+     * @return the container document's content, or null if not available
+     * @see #getMetadataList()
+     */
+    public String getContent() {
+        List<Metadata> metadataList = getMetadataList();
+        if (metadataList.isEmpty()) {
+            return null;
+        }
+        return metadataList.get(0).get(TikaCoreProperties.TIKA_CONTENT);
+    }
+
+    /**
+     * Get the container document's metadata only.
+     * <p>
+     * <b>WARNING - RMETA mode:</b> In RMETA mode, this returns ONLY the 
container
+     * document's metadata. Metadata from embedded documents (including their 
content,
+     * titles, authors, and any parse exceptions) is NOT included. To access 
metadata
+     * from all documents, use {@link #getMetadataList()}.
+     * <p>
+     * <b>CONCATENATE mode:</b> In CONCATENATE mode, there is only one metadata
+     * object containing the container's metadata and concatenated content from
+     * all documents. By the nature of CONCATENATE mode, you are losing 
metadata
+     * from embedded files, and Tika is silently swallowing exceptions in 
embedded files.
+     * <p>
+     * <b>Recommendation:</b> For RMETA mode (the default), use {@link 
#getMetadataList()}
+     * to access metadata from all documents, including embedded document 
exceptions
+     * (stored in {@link TikaCoreProperties#EMBEDDED_EXCEPTION}).
+     *
+     * @return the container document's metadata, or null if not available
+     * @see #getMetadataList()
+     */
+    public Metadata getMetadata() {
+        List<Metadata> metadataList = getMetadataList();
+        if (metadataList.isEmpty()) {
+            return null;
+        }
+        return metadataList.get(0);
+    }
+
+    /**
+     * Get any error message associated with the result.
+     *
+     * @return the error message, or null if none
+     */
+    public String getMessage() {
+        return pipesResult.message();
+    }
+
+    /**
+     * Get the underlying PipesResult for advanced access.
+     *
+     * @return the pipes result
+     */
+    public PipesResult getPipesResult() {
+        return pipesResult;
+    }
+
+    @Override
+    public String toString() {
+        return "PipesForkResult{" +
+                "status=" + getStatus() +
+                ", metadataCount=" + getMetadataList().size() +
+                ", message=" + getMessage() +
+                '}';
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
 
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
new file mode 100644
index 000000000..33c808ad4
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fork;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.api.HandlerConfig;
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+
+public class PipesForkParserTest {
+
+    private static final Path PLUGINS_DIR = Paths.get("target/plugins");
+
+    @TempDir
+    Path tempDir;
+
+    @BeforeAll
+    static void checkPluginsDir() {
+        if (!Files.isDirectory(PLUGINS_DIR)) {
+            System.err.println("WARNING: Plugins directory not found at " + 
PLUGINS_DIR.toAbsolutePath() +
+                    ". Tests may fail. Run 'mvn process-test-resources' 
first.");
+        }
+    }
+
+    private Path createZipWithEmbeddedFiles(String zipName, String... entries) 
throws IOException {
+        Path zipPath = tempDir.resolve(zipName);
+        try (OutputStream fos = Files.newOutputStream(zipPath);
+             ZipOutputStream zos = new ZipOutputStream(fos)) {
+            for (int i = 0; i < entries.length; i += 2) {
+                zos.putNextEntry(new ZipEntry(entries[i]));
+                zos.write(entries[i + 1].getBytes(StandardCharsets.UTF_8));
+                zos.closeEntry();
+            }
+        }
+        return zipPath;
+    }
+
+    @Test
+    public void testParseTextFile() throws Exception {
+        // Create a simple test file
+        Path testFile = tempDir.resolve("test.txt");
+        String content = "Hello, this is a test document.\nIt has multiple 
lines.";
+        Files.writeString(testFile, content);
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setTimeoutMillis(60000)
+                .addJvmArg("-Xmx256m");
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            PipesForkResult result = parser.parse(tis);
+
+            assertTrue(result.isSuccess(), "Parse should succeed. Status: " + 
result.getStatus()
+                    + ", message: " + result.getMessage());
+            assertFalse(result.isProcessCrash(), "Should not be a process 
crash");
+
+            List<Metadata> metadataList = result.getMetadataList();
+            assertNotNull(metadataList, "Metadata list should not be null");
+            assertFalse(metadataList.isEmpty(), "Metadata list should not be 
empty");
+
+            String extractedContent = result.getContent();
+            assertNotNull(extractedContent, "Content should not be null");
+            assertTrue(extractedContent.contains("Hello"), "Content should 
contain 'Hello'");
+            assertTrue(extractedContent.contains("test document"), "Content 
should contain 'test document'");
+        }
+    }
+
+    @Test
+    public void testParseWithMetadata() throws Exception {
+        // Create a simple HTML file
+        Path testFile = tempDir.resolve("test.html");
+        String html = "<html><head><title>Test Title</title></head>" +
+                "<body><p>Test paragraph content.</p></body></html>";
+        Files.writeString(testFile, html);
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            Metadata initialMetadata = new Metadata();
+            PipesForkResult result = parser.parse(tis, initialMetadata);
+
+            assertTrue(result.isSuccess(), "Parse should succeed");
+
+            Metadata metadata = result.getMetadata();
+            assertNotNull(metadata, "Metadata should not be null");
+
+            String extractedContent = result.getContent();
+            assertNotNull(extractedContent, "Content should not be null");
+            assertTrue(extractedContent.contains("Test paragraph"), "Content 
should contain paragraph text");
+        }
+    }
+
+    @Test
+    public void testParseMultipleFiles() throws Exception {
+        // Create multiple test files
+        Path testFile1 = tempDir.resolve("test1.txt");
+        Path testFile2 = tempDir.resolve("test2.txt");
+        Files.writeString(testFile1, "Content of first file");
+        Files.writeString(testFile2, "Content of second file");
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config)) {
+            try (TikaInputStream tis1 = TikaInputStream.get(testFile1)) {
+                PipesForkResult result1 = parser.parse(tis1);
+                assertTrue(result1.isSuccess());
+                assertTrue(result1.getContent().contains("first file"));
+            }
+
+            try (TikaInputStream tis2 = TikaInputStream.get(testFile2)) {
+                PipesForkResult result2 = parser.parse(tis2);
+                assertTrue(result2.isSuccess());
+                assertTrue(result2.getContent().contains("second file"));
+            }
+        }
+    }
+
+    @Test
+    public void testConcatenateMode() throws Exception {
+        Path testZip = createZipWithEmbeddedFiles("test_with_embedded.zip",
+                "embedded1.txt", "Content from first embedded file",
+                "embedded2.txt", "Content from second embedded file");
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.CONCATENATE)
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testZip)) {
+            PipesForkResult result = parser.parse(tis);
+
+            assertTrue(result.isSuccess(), "Parse should succeed");
+
+            // In CONCATENATE mode, there should be exactly one metadata object
+            // even though the zip contains multiple embedded files
+            List<Metadata> metadataList = result.getMetadataList();
+            assertEquals(1, metadataList.size(), "CONCATENATE mode should 
return single metadata");
+
+            // The content should contain text from both embedded files
+            String content = result.getContent();
+            assertNotNull(content);
+            assertTrue(content.contains("first embedded"),
+                    "Content should contain text from first embedded file");
+            assertTrue(content.contains("second embedded"),
+                    "Content should contain text from second embedded file");
+        }
+    }
+
+    @Test
+    public void testRmetaModeWithEmbedded() throws Exception {
+        Path testZip = createZipWithEmbeddedFiles("test_rmeta_embedded.zip",
+                "file1.txt", "First file content",
+                "file2.txt", "Second file content");
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testZip)) {
+            PipesForkResult result = parser.parse(tis);
+
+            assertTrue(result.isSuccess(), "Parse should succeed");
+
+            // In RMETA mode, there should be multiple metadata objects:
+            // one for the container (zip) and one for each embedded file
+            List<Metadata> metadataList = result.getMetadataList();
+            assertTrue(metadataList.size() >= 3,
+                    "RMETA mode should return metadata for container + 
embedded files, got: "
+                    + metadataList.size());
+        }
+    }
+
+    @Test
+    public void testDefaultConfigMatchesExplicitRmeta() throws Exception {
+        Path testZip = createZipWithEmbeddedFiles("test_default_config.zip",
+                "file1.txt", "First file content",
+                "file2.txt", "Second file content");
+
+        // Parse with explicit RMETA config
+        PipesForkParserConfig explicitConfig = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setTimeoutMillis(60000);
+
+        int explicitMetadataCount;
+        try (PipesForkParser parser = new PipesForkParser(explicitConfig);
+             TikaInputStream tis = TikaInputStream.get(testZip)) {
+            PipesForkResult result = parser.parse(tis);
+            assertTrue(result.isSuccess());
+            explicitMetadataCount = result.getMetadataList().size();
+        }
+
+        // Parse with default config (only pluginsDir set) - should produce 
same results
+        PipesForkParserConfig defaultConfig = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR);
+        try (PipesForkParser parser = new PipesForkParser(defaultConfig);
+             TikaInputStream tis = TikaInputStream.get(testZip)) {
+            PipesForkResult result = parser.parse(tis);
+
+            assertTrue(result.isSuccess(), "Parse with default config should 
succeed");
+            assertEquals(explicitMetadataCount, 
result.getMetadataList().size(),
+                    "Default config should produce same metadata count as 
explicit RMETA config");
+        }
+    }
+
+    @Test
+    public void testTextVsXhtmlHandlerType() throws Exception {
+        // Create an HTML file to parse
+        Path testFile = tempDir.resolve("test_handler.html");
+        String html = "<html><head><title>Test Title</title></head>" +
+                "<body><p>Paragraph one.</p><p>Paragraph 
two.</p></body></html>";
+        Files.writeString(testFile, html);
+
+        // Parse with TEXT handler - should get plain text without markup
+        PipesForkParserConfig textConfig = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setTimeoutMillis(60000);
+
+        String textContent;
+        try (PipesForkParser parser = new PipesForkParser(textConfig);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            PipesForkResult result = parser.parse(tis);
+            assertTrue(result.isSuccess(), "TEXT parse should succeed");
+            textContent = result.getContent();
+            assertNotNull(textContent, "TEXT content should not be null");
+            // TEXT mode should NOT contain HTML tags
+            assertFalse(textContent.contains("<p>"), "TEXT content should not 
contain <p> tags");
+            assertFalse(textContent.contains("<html>"), "TEXT content should 
not contain <html> tags");
+            assertTrue(textContent.contains("Paragraph one"), "TEXT content 
should contain text");
+        }
+
+        // Parse with XML handler - should get XHTML markup
+        PipesForkParserConfig xmlConfig = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.XML)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setTimeoutMillis(60000);
+
+        String xmlContent;
+        try (PipesForkParser parser = new PipesForkParser(xmlConfig);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            PipesForkResult result = parser.parse(tis);
+            assertTrue(result.isSuccess(), "XML parse should succeed");
+            xmlContent = result.getContent();
+            assertNotNull(xmlContent, "XML content should not be null");
+            // XML mode SHOULD contain markup
+            assertTrue(xmlContent.contains("<p>") || xmlContent.contains("<p 
"),
+                    "XML content should contain <p> tags");
+            assertTrue(xmlContent.contains("Paragraph one"), "XML content 
should contain text");
+        }
+
+        // The XML content should be longer due to markup
+        assertTrue(xmlContent.length() > textContent.length(),
+                "XML content should be longer than TEXT content due to 
markup");
+    }
+
+    @Test
+    public void testWriteLimit() throws Exception {
+        // Create a file with more content than the write limit
+        Path testFile = tempDir.resolve("longfile.txt");
+        StringBuilder longContent = new StringBuilder();
+        for (int i = 0; i < 1000; i++) {
+            longContent.append("This is line ").append(i).append(" of the test 
document.\n");
+        }
+        Files.writeString(testFile, longContent.toString());
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setHandlerType(BasicContentHandlerFactory.HANDLER_TYPE.TEXT)
+                .setParseMode(HandlerConfig.PARSE_MODE.RMETA)
+                .setWriteLimit(100)  // Limit to 100 characters
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            PipesForkResult result = parser.parse(tis);
+
+            // Note: behavior depends on throwOnWriteLimitReached setting
+            // With default (true), this may result in an exception being 
recorded
+            assertNotNull(result);
+        }
+    }
+
+    @Test
+    public void testDefaultConfiguration() throws Exception {
+        Path testFile = tempDir.resolve("default.txt");
+        Files.writeString(testFile, "Testing default configuration");
+
+        // Use default configuration (only pluginsDir set)
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR);
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            PipesForkResult result = parser.parse(tis);
+            assertTrue(result.isSuccess());
+            assertNotNull(result.getContent());
+        }
+    }
+
+    @Test
+    public void testFileNotFoundThrowsException() throws Exception {
+        // Try to parse a file that doesn't exist
+        Path nonExistentFile = tempDir.resolve("does_not_exist.txt");
+
+        // TikaInputStream.get(Path) throws NoSuchFileException for 
non-existent files
+        // because it needs to read file attributes (size)
+        assertThrows(java.nio.file.NoSuchFileException.class, () -> {
+            TikaInputStream.get(nonExistentFile);
+        });
+    }
+
+    @Test
+    public void testExceptionOnOneFileDoesNotPreventNextParse() throws 
Exception {
+        // Test that an exception when opening one file doesn't prevent 
parsing another file
+        Path nonExistentFile = tempDir.resolve("does_not_exist.txt");
+        Path realFile = tempDir.resolve("real_file.txt");
+        Files.writeString(realFile, "This file exists");
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config)) {
+            // First attempt - TikaInputStream.get() will throw for 
non-existent file
+            assertThrows(java.nio.file.NoSuchFileException.class, () -> {
+                TikaInputStream.get(nonExistentFile);
+            });
+
+            // Second parse - should succeed despite the previous exception
+            try (TikaInputStream tis2 = TikaInputStream.get(realFile)) {
+                PipesForkResult result2 = parser.parse(tis2);
+                assertTrue(result2.isSuccess(), "Should succeed for existing 
file");
+                assertTrue(result2.getContent().contains("This file exists"));
+            }
+        }
+    }
+
+    @Test
+    public void testParseSuccessWithExceptionStatus() throws Exception {
+        // Create a file that will parse but may have warnings
+        // For example, a file with content that might trigger a write limit
+        Path testFile = tempDir.resolve("parse_with_warning.txt");
+        Files.writeString(testFile, "Simple content");
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            PipesForkResult result = parser.parse(tis);
+
+            // Verify we can check for different success states
+            if (result.isSuccess()) {
+                // Could be PARSE_SUCCESS, PARSE_SUCCESS_WITH_EXCEPTION, or 
EMIT_SUCCESS_PASSBACK
+                assertTrue(
+                        result.getStatus() == 
PipesResult.RESULT_STATUS.PARSE_SUCCESS ||
+                        result.getStatus() == 
PipesResult.RESULT_STATUS.PARSE_SUCCESS_WITH_EXCEPTION ||
+                        result.getStatus() == 
PipesResult.RESULT_STATUS.EMIT_SUCCESS_PASSBACK,
+                        "Success status should be one of the success types");
+            }
+        }
+    }
+
+    @Test
+    public void testResultCategorization() throws Exception {
+        // Test that we can properly categorize results
+        Path testFile = tempDir.resolve("categorize.txt");
+        Files.writeString(testFile, "Test categorization");
+
+        PipesForkParserConfig config = new PipesForkParserConfig()
+                .setPluginsDir(PLUGINS_DIR)
+                .setTimeoutMillis(60000);
+
+        try (PipesForkParser parser = new PipesForkParser(config);
+             TikaInputStream tis = TikaInputStream.get(testFile)) {
+            PipesForkResult result = parser.parse(tis);
+
+            // At least one of these should be true
+            boolean hasCategory = result.isSuccess() || 
result.isProcessCrash() || result.isApplicationError();
+            assertTrue(hasCategory, "Result should have a valid category");
+
+            // These should be mutually exclusive
+            int trueCount = 0;
+            if (result.isSuccess()) trueCount++;
+            if (result.isProcessCrash()) trueCount++;
+            if (result.isApplicationError()) trueCount++;
+            assertEquals(1, trueCount, "Exactly one category should be true");
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index d1f6a8e16..0d63a8ff3 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -24,13 +24,10 @@ import java.nio.file.Paths;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileTime;
 import java.util.Date;
-import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.tika.config.ConfigContainer;
-import org.apache.tika.config.JsonConfig;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TikaInputStream;
@@ -69,47 +66,33 @@ public class FileSystemFetcher extends 
AbstractTikaExtension implements Fetcher
     }
 
     @Override
-    public TikaInputStream fetch(String fetchKey, Metadata metadata, 
ParseContext parseContext) throws IOException, TikaException {
+    public TikaInputStream fetch(String fetchKey, Metadata metadata, 
ParseContext parseContext)
+            throws IOException, TikaException {
         if (fetchKey.contains("\u0000")) {
-            throw new IllegalArgumentException("Path must not contain 'u0000'. 
" +
-                    "Please review the life decisions that led you to 
requesting " +
-                    "a file name with this character in it.");
+            throw new IllegalArgumentException("Path must not contain 'u0000'. 
"
+                    + "Please review the life decisions that led you to 
requesting "
+                    + "a file name with this character in it.");
         }
         FileSystemFetcherConfig config = defaultFileSystemFetcherConfig;
-        ConfigContainer configContainer = 
parseContext.get(ConfigContainer.class);
-        if (configContainer != null) {
-            Optional<JsonConfig> configJson = 
configContainer.get(getExtensionConfig().id());
-            if (configJson.isPresent()) {
-                try {
-                    // Check if basePath is present in runtime config - this 
is not allowed for security
-                    if (configJson.get().json().contains("\"basePath\"")) {
-                        throw new TikaConfigException(
-                                "Cannot change 'basePath' at runtime for 
security reasons. " +
-                                        "basePath can only be set during 
initialization.");
-                    }
-
-                    // Load runtime config (excludes basePath for security)
-                    FileSystemFetcherRuntimeConfig runtimeConfig =
-                            
FileSystemFetcherRuntimeConfig.load(configJson.get().json());
-
-                    // Merge runtime config into default config while 
preserving basePath
-                    config = new FileSystemFetcherConfig()
-                            
.setBasePath(defaultFileSystemFetcherConfig.getBasePath())
-                            
.setExtractFileSystemMetadata(runtimeConfig.isExtractFileSystemMetadata());
-                } catch (TikaConfigException e) {
-                    throw new IOException("Failed to load runtime config", e);
-                }
-            }
-        }
-        Path p = null;
-        if (! StringUtils.isBlank(config.getBasePath())) {
+        Path p;
+        if (StringUtils.isBlank(config.getBasePath())) {
+            // No basePath - treat fetchKey as absolute path
+            p = Paths.get(fetchKey);
+        } else {
             Path basePath = Paths.get(config.getBasePath());
             if (!Files.isDirectory(basePath)) {
                 throw new IOException("BasePath is not a directory: " + 
basePath);
             }
             p = basePath.resolve(fetchKey);
-            if (!p.toRealPath().startsWith(basePath.toRealPath())) {
-                throw new IllegalArgumentException(
+            // First check using normalize() - catches obvious path traversal 
attempts
+            // This doesn't require the file to exist, so it works on all 
platforms
+            if (!p.normalize().startsWith(basePath.normalize())) {
+                throw new SecurityException(
+                        "fetchKey must resolve to be a descendant of the 
'basePath'");
+            }
+            // Additional check using toRealPath() for symlink attacks (only 
if file exists)
+            if (Files.exists(p) && 
!p.toRealPath().startsWith(basePath.toRealPath())) {
+                throw new SecurityException(
                         "fetchKey must resolve to be a descendant of the 
'basePath'");
             }
         }
@@ -143,38 +126,39 @@ public class FileSystemFetcher extends 
AbstractTikaExtension implements Fetcher
         metadata.set(property, new Date(fileTime.toMillis()));
     }
 
-    private void checkConfig(FileSystemFetcherConfig fetcherConfig) throws 
TikaConfigException {
+    private void checkConfig(FileSystemFetcherConfig fetcherConfig)
+            throws TikaConfigException {
         String basePath = fetcherConfig.getBasePath();
         if (basePath == null || basePath.isBlank()) {
-            LOG.warn("'basePath' has not been set. " +
-                    "This means that client code or clients can read from any 
file that this " +
-                    "process has permissions to read. If you are running 
tika-server, make " +
-                    "absolutely certain that you've locked down " +
-                    "access to tika-server and file-permissions for the 
tika-server process.");
+            if (!fetcherConfig.isAllowAbsolutePaths()) {
+                throw new TikaConfigException(
+                        "'basePath' must be set, or 'allowAbsolutePaths' must 
be true. "
+                                + "Without basePath, clients can read any file 
this process "
+                                + "has access to. Set 'allowAbsolutePaths: 
true' to explicitly "
+                                + "allow this behavior and accept the security 
risks.");
+            }
             return;
         }
-        if (basePath.toString().startsWith("http://";)) {
-            throw new TikaConfigException("FileSystemFetcher only works with 
local file systems. " +
-                    " Please use the tika-fetcher-http module for http calls");
-        } else if (basePath.toString().startsWith("ftp://";)) {
-            throw new TikaConfigException("FileSystemFetcher only works with 
local file systems. " +
-                    " Please consider contributing an ftp fetcher module");
-        } else if (basePath.toString().startsWith("s3://")) {
-            throw new TikaConfigException("FileSystemFetcher only works with 
local file systems. " +
-                    " Please use the tika-fetcher-s3 module");
+        if (basePath.startsWith("http://";)) {
+            throw new TikaConfigException(
+                    "FileSystemFetcher only works with local file systems. "
+                            + "Please use the tika-fetcher-http module for 
http calls");
+        } else if (basePath.startsWith("ftp://";)) {
+            throw new TikaConfigException(
+                    "FileSystemFetcher only works with local file systems. "
+                            + "Please consider contributing an ftp fetcher 
module");
+        } else if (basePath.startsWith("s3://")) {
+            throw new TikaConfigException(
+                    "FileSystemFetcher only works with local file systems. "
+                            + "Please use the tika-fetcher-s3 module");
         }
 
         if (basePath.contains("\u0000")) {
             throw new TikaConfigException(
-                    "base path must not contain \u0000. " + "Seriously, what 
were you thinking?");
+                    "base path must not contain \u0000. Seriously, what were 
you thinking?");
         }
     }
 
-    static boolean isDescendant(Path root, Path descendant) {
-        return descendant.toAbsolutePath().normalize()
-                         .startsWith(root.toAbsolutePath().normalize());
-    }
-
     @Override
     public String toString() {
         return "FileSystemFetcher{" + "defaultFileSystemFetcherConfig=" + 
defaultFileSystemFetcherConfig + ", pluginConfig=" + pluginConfig + '}';
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherConfig.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherConfig.java
index fcf2e5d5e..7ee64e38d 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherConfig.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherConfig.java
@@ -37,7 +37,8 @@ public class FileSystemFetcherConfig {
     }
 
     private String basePath;
-    private boolean extractFileSystemMetadata;
+    private boolean extractFileSystemMetadata = false;
+    private boolean allowAbsolutePaths = false;
 
     public boolean isExtractFileSystemMetadata() {
         return extractFileSystemMetadata;
@@ -56,4 +57,18 @@ public class FileSystemFetcherConfig {
         this.basePath = basePath;
         return this;
     }
+
+    /**
+     * If true, allows fetchKey to be an absolute path when basePath is not 
set.
+     * This suppresses the security warning about unrestricted file access.
+     * Use this when you intentionally want to allow fetching from any path.
+     */
+    public boolean isAllowAbsolutePaths() {
+        return allowAbsolutePaths;
+    }
+
+    public FileSystemFetcherConfig setAllowAbsolutePaths(boolean 
allowAbsolutePaths) {
+        this.allowAbsolutePaths = allowAbsolutePaths;
+        return this;
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherRuntimeConfig.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherRuntimeConfig.java
deleted file mode 100644
index ffadf9822..000000000
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherRuntimeConfig.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.fetcher.fs;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.tika.exception.TikaConfigException;
-
-/**
- * Runtime configuration for FileSystemFetcher.
- * Only includes fields that are safe to update at runtime.
- * basePath is intentionally excluded for security reasons.
- */
-public class FileSystemFetcherRuntimeConfig {
-
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-    public static FileSystemFetcherRuntimeConfig load(final String json)
-            throws TikaConfigException {
-        try {
-            return OBJECT_MAPPER.readValue(json,
-                    FileSystemFetcherRuntimeConfig.class);
-        } catch (JsonProcessingException e) {
-            throw new TikaConfigException(
-                    "Failed to parse FileSystemFetcherRuntimeConfig from 
JSON", e);
-        }
-    }
-
-    private boolean extractFileSystemMetadata;
-
-    public boolean isExtractFileSystemMetadata() {
-        return extractFileSystemMetadata;
-    }
-
-    public FileSystemFetcherRuntimeConfig setExtractFileSystemMetadata(boolean 
extractFileSystemMetadata) {
-        this.extractFileSystemMetadata = extractFileSystemMetadata;
-        return this;
-    }
-}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherRuntimeConfigTest.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherRuntimeConfigTest.java
deleted file mode 100644
index c1be6c535..000000000
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherRuntimeConfigTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.fetcher.fs;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Locale;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import org.apache.tika.config.ConfigContainer;
-import org.apache.tika.metadata.FileSystem;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.plugins.ExtensionConfig;
-
-/**
- * Tests runtime configuration of FileSystemFetcher via ConfigContainer and 
ParseContext.
- */
-public class FileSystemFetcherRuntimeConfigTest {
-
-    @Test
-    public void testRuntimeConfigViaParseContext(@TempDir Path tempDir) throws 
Exception {
-        // Create a test file
-        Path testFile = tempDir.resolve("test.txt");
-        Files.writeString(testFile, "test content");
-
-        // Create fetcher with default config (no extractFileSystemMetadata)
-        String defaultConfig = String.format(Locale.ROOT, 
"{\"basePath\":\"%s\"}",
-                tempDir.toString().replace("\\", "\\\\"));
-        ExtensionConfig pluginConfig = new ExtensionConfig("test-fetcher", 
"test", defaultConfig);
-        FileSystemFetcher fetcher = new FileSystemFetcher(pluginConfig);
-
-        // Fetch without runtime config - should not extract file system 
metadata
-        Metadata metadata1 = new Metadata();
-        ParseContext context1 = new ParseContext();
-        try (InputStream is = fetcher.fetch("test.txt", metadata1, context1)) {
-            assertNotNull(is);
-        }
-        assertNull(metadata1.get(FileSystem.CREATED),
-                "Without extractFileSystemMetadata, should not have CREATED 
metadata");
-
-        // Now create runtime config with extractFileSystemMetadata=true
-        // Note: basePath is NOT included for security reasons
-        String runtimeConfig = "{\"extractFileSystemMetadata\":true}";
-
-        ConfigContainer configContainer = new ConfigContainer();
-        configContainer.set("test-fetcher", runtimeConfig);
-
-        ParseContext context2 = new ParseContext();
-        context2.set(ConfigContainer.class, configContainer);
-
-        // Fetch with runtime config - should extract file system metadata
-        Metadata metadata2 = new Metadata();
-        try (InputStream is = fetcher.fetch("test.txt", metadata2, context2)) {
-            assertNotNull(is);
-        }
-        assertNotNull(metadata2.get(FileSystem.CREATED),
-                "With extractFileSystemMetadata=true, should have CREATED 
metadata");
-        assertNotNull(metadata2.get(FileSystem.MODIFIED),
-                "With extractFileSystemMetadata=true, should have MODIFIED 
metadata");
-    }
-
-    @Test
-    public void testRuntimeConfigCannotOverrideBasePath(@TempDir Path tempDir) 
throws Exception {
-        // Create two directories with different files
-        Path dir1 = tempDir.resolve("dir1");
-        Path dir2 = tempDir.resolve("dir2");
-        Files.createDirectories(dir1);
-        Files.createDirectories(dir2);
-
-        Path file1 = dir1.resolve("test.txt");
-        Files.writeString(file1, "content from dir1");
-
-        // Create fetcher with dir1 as default basePath
-        String defaultConfig = String.format(Locale.ROOT, 
"{\"basePath\":\"%s\"}",
-                dir1.toString().replace("\\", "\\\\"));
-        ExtensionConfig pluginConfig = new ExtensionConfig("test-fetcher", 
"test", defaultConfig);
-        FileSystemFetcher fetcher = new FileSystemFetcher(pluginConfig);
-
-        // Fetch from default basePath (dir1)
-        Metadata metadata1 = new Metadata();
-        ParseContext context1 = new ParseContext();
-        try (InputStream is = fetcher.fetch("test.txt", metadata1, context1)) {
-            String content = new String(is.readAllBytes(), 
StandardCharsets.UTF_8);
-            assertEquals("content from dir1", content);
-        }
-
-        // Try to override basePath at runtime to point to dir2
-        // This should throw an exception for security reasons
-        String runtimeConfig = String.format(Locale.ROOT, 
"{\"basePath\":\"%s\"}",
-                dir2.toString().replace("\\", "\\\\"));
-        ConfigContainer configContainer = new ConfigContainer();
-        configContainer.set("test-fetcher", runtimeConfig);
-
-        ParseContext context2 = new ParseContext();
-        context2.set(ConfigContainer.class, configContainer);
-
-        // Fetch with runtime config - should throw exception
-        Metadata metadata2 = new Metadata();
-        IOException exception = assertThrows(IOException.class, () -> {
-            fetcher.fetch("test.txt", metadata2, context2);
-        });
-        assertTrue(exception.getCause() != null &&
-                exception.getCause().getMessage().contains("Cannot change 
'basePath' at runtime"),
-                "Should throw exception when attempting to change basePath at 
runtime");
-    }
-
-    @Test
-    public void testConfigContainerNotPresent(@TempDir Path tempDir) throws 
Exception {
-        // Create a test file
-        Path testFile = tempDir.resolve("test.txt");
-        Files.writeString(testFile, "test content");
-
-        // Create fetcher with default config
-        String defaultConfig = String.format(Locale.ROOT, 
"{\"basePath\":\"%s\"}",
-                tempDir.toString().replace("\\", "\\\\"));
-        ExtensionConfig pluginConfig = new ExtensionConfig("test-fetcher", 
"test", defaultConfig);
-        FileSystemFetcher fetcher = new FileSystemFetcher(pluginConfig);
-
-        // Fetch with ParseContext that has no ConfigContainer - should use 
default config
-        Metadata metadata = new Metadata();
-        ParseContext context = new ParseContext();
-        // Don't set ConfigContainer in context
-
-        try (InputStream is = fetcher.fetch("test.txt", metadata, context)) {
-            assertNotNull(is);
-            String content = new String(is.readAllBytes(), 
StandardCharsets.UTF_8);
-            assertEquals("test content", content);
-        }
-    }
-
-    @Test
-    public void testConfigContainerWithDifferentId(@TempDir Path tempDir) 
throws Exception {
-        // Create a test file
-        Path testFile = tempDir.resolve("test.txt");
-        Files.writeString(testFile, "test content");
-
-        // Create fetcher with default config
-        String defaultConfig = String.format(Locale.ROOT, 
"{\"basePath\":\"%s\"}",
-                tempDir.toString().replace("\\", "\\\\"));
-        ExtensionConfig pluginConfig = new ExtensionConfig("test-fetcher", 
"test", defaultConfig);
-        FileSystemFetcher fetcher = new FileSystemFetcher(pluginConfig);
-
-        // Create ConfigContainer with config for a different fetcher ID
-        ConfigContainer configContainer = new ConfigContainer();
-        configContainer.set("different-fetcher", 
"{\"basePath\":\"/some/other/path\"}");
-
-        ParseContext context = new ParseContext();
-        context.set(ConfigContainer.class, configContainer);
-
-        // Fetch - should use default config since runtime config is for 
different ID
-        Metadata metadata = new Metadata();
-        try (InputStream is = fetcher.fetch("test.txt", metadata, context)) {
-            assertNotNull(is);
-            String content = new String(is.readAllBytes(), 
StandardCharsets.UTF_8);
-            assertEquals("test content", content);
-        }
-    }
-}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
index 8c3254503..1a30b8c42 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
@@ -16,42 +16,127 @@
  */
 package org.apache.tika.pipes.fetcher.fs;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
 import org.apache.tika.plugins.ExtensionConfig;
 
 
 public class FileSystemFetcherTest {
 
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @TempDir
+    Path tempDir;
+
+    private Fetcher createFetcher(Path basePath, Boolean allowAbsolutePaths) 
throws TikaConfigException, IOException {
+        ObjectNode config = MAPPER.createObjectNode();
+        if (basePath != null) {
+            config.put("basePath", basePath.toAbsolutePath().toString());
+        }
+        if (allowAbsolutePaths != null) {
+            config.put("allowAbsolutePaths", allowAbsolutePaths);
+        }
+        ExtensionConfig pluginConfig = new ExtensionConfig("test", "test", 
config.toString());
+        return new FileSystemFetcherFactory().buildExtension(pluginConfig);
+    }
+
     @Test
-    public void testDescendant() throws Exception {
+    public void testNullByte() throws Exception {
+        assertThrows(TikaConfigException.class, () -> {
+            ObjectNode config = MAPPER.createObjectNode();
+            config.put("basePath", "bad\u0000path");
+            ExtensionConfig pluginConfig = new ExtensionConfig("test", "test", 
config.toString());
+            new FileSystemFetcherFactory().buildExtension(pluginConfig);
+        });
+    }
 
-        Path root = Paths.get("/ab/cd/");
-        Path descendant = root.resolve("ef/gh/ij.pdf");
-        assertTrue(FileSystemFetcher.isDescendant(root, descendant));
+    @Test
+    public void testPathTraversalBlocked() throws Exception {
+        // Create a subdirectory as basePath and a file outside it
+        Path basePath = tempDir.resolve("allowed");
+        Files.createDirectories(basePath);
+
+        Path fileInBase = basePath.resolve("safe.txt");
+        Files.writeString(fileInBase, "safe content");
+
+        Path fileOutsideBase = tempDir.resolve("secret.txt");
+        Files.writeString(fileOutsideBase, "secret content");
+
+        // Create fetcher with basePath set to the subdirectory
+        Fetcher fetcher = createFetcher(basePath, null);
 
-        descendant = Paths.get("/cd/ef.pdf");
-        assertFalse(FileSystemFetcher.isDescendant(root, descendant));
+        // Valid path within basePath should work
+        try (TikaInputStream tis = fetcher.fetch("safe.txt", new Metadata(), 
new ParseContext())) {
+            assertNotNull(tis);
+        }
 
-        descendant = root.resolve("../../ij.pdf");
-        assertFalse(FileSystemFetcher.isDescendant(root, descendant));
+        // Path traversal attempt should be rejected
+        assertThrows(SecurityException.class, () -> {
+            fetcher.fetch("../secret.txt", new Metadata(), new ParseContext());
+        });
     }
 
     @Test
-    public void testNullByte() throws Exception {
+    public void testDeepPathTraversalBlocked() throws Exception {
+        // Create nested directories
+        Path basePath = tempDir.resolve("a/b/c");
+        Files.createDirectories(basePath);
+
+        Path fileInBase = basePath.resolve("file.txt");
+        Files.writeString(fileInBase, "nested content");
+
+        Path fileOutsideBase = tempDir.resolve("outside.txt");
+        Files.writeString(fileOutsideBase, "outside content");
+
+        Fetcher fetcher = createFetcher(basePath, null);
+
+        // Deep path traversal should be rejected
+        assertThrows(SecurityException.class, () -> {
+            fetcher.fetch("../../../outside.txt", new Metadata(), new 
ParseContext());
+        });
+
+        // Even deeper traversal should be rejected
+        assertThrows(SecurityException.class, () -> {
+            fetcher.fetch("../../../../../../../../etc/passwd", new 
Metadata(), new ParseContext());
+        });
+    }
+
+    @Test
+    public void testAllowAbsolutePathsRequired() throws Exception {
+        // Without basePath and without allowAbsolutePaths, should throw
         assertThrows(TikaConfigException.class, () -> {
-            ExtensionConfig pluginConfig = new ExtensionConfig("test", "test",
-                    "{ \"basePath\":\"bad\\u0000path\"}");
-            Fetcher f = new 
FileSystemFetcherFactory().buildExtension(pluginConfig);
+            createFetcher(null, null);
         });
     }
+
+    @Test
+    public void testAllowAbsolutePathsWorks() throws Exception {
+        // Create a file to fetch
+        Path testFile = tempDir.resolve("test.txt");
+        Files.writeString(testFile, "test content");
+
+        // With allowAbsolutePaths=true and no basePath, should work
+        Fetcher fetcher = createFetcher(null, true);
+
+        // Fetch using absolute path
+        try (TikaInputStream tis = fetcher.fetch(
+                testFile.toAbsolutePath().toString(), new Metadata(), new 
ParseContext())) {
+            assertNotNull(tis);
+        }
+    }
 }
diff --git a/tika-serialization/pom.xml b/tika-serialization/pom.xml
index 186146bc1..e9401b73c 100644
--- a/tika-serialization/pom.xml
+++ b/tika-serialization/pom.xml
@@ -90,6 +90,15 @@
   </dependencies>
   <build>
     <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>**/test-documents/**</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>

Reply via email to