This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4626
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 47cdae7c52da50951548319e4535a75684edb65d
Author: tallison <[email protected]>
AuthorDate: Sun Jan 18 08:31:27 2026 -0500

    checkpoint
---
 .../org/apache/tika/pipes/core/PipesConfig.java    |  29 ++++-
 .../apache/tika/server/core/TikaServerProcess.java |  88 +++++++++++++-
 .../core/resource/RecursiveMetadataResource.java   |  37 ++++++
 .../tika/server/core/resource/TikaResource.java    | 128 +++++++++++++++++++--
 4 files changed, 271 insertions(+), 11 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
index 74cd509a0a..f0ae1bffd1 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
@@ -100,7 +100,14 @@ public class PipesConfig {
 
     private ArrayList<String> forkedJvmArgs = new ArrayList<>();
     private String javaPath = "java";
-    
+
+    /**
+     * Optional directory for temporary files during pipes-based parsing.
+     * If not set, the system default temp directory will be used.
+     * Consider using a RAM-backed filesystem (e.g., /dev/shm) for better 
performance.
+     */
+    private String tempDirectory = null;
+
     /**
      * Type of ConfigStore to use for distributed state management.
      * Options: "memory" (default), "ignite"
@@ -435,4 +442,24 @@ public class PipesConfig {
     public void setConfigStoreParams(String configStoreParams) {
         this.configStoreParams = configStoreParams;
     }
+
+    /**
+     * Gets the directory for temporary files during pipes-based parsing.
+     *
+     * @return the temp directory path, or null to use system default
+     */
+    public String getTempDirectory() {
+        return tempDirectory;
+    }
+
+    /**
+     * Sets the directory for temporary files during pipes-based parsing.
+     * If not set, the system default temp directory will be used.
+     * Consider using a RAM-backed filesystem (e.g., /dev/shm or /tmpfs) for 
better performance.
+     *
+     * @param tempDirectory the temp directory path, or null to use system 
default
+     */
+    public void setTempDirectory(String tempDirectory) {
+        this.tempDirectory = tempDirectory;
+    }
 }
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 09de5b18ae..d657c73ee6 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -58,6 +58,10 @@ import org.apache.tika.config.ServiceLoader;
 import org.apache.tika.config.loader.TikaJsonConfig;
 import org.apache.tika.config.loader.TikaLoader;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.core.EmitStrategy;
+import org.apache.tika.pipes.core.EmitStrategyConfig;
+import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.PipesParser;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.fetcher.FetcherManager;
 import org.apache.tika.plugins.TikaPluginManager;
@@ -65,6 +69,7 @@ import org.apache.tika.server.core.resource.AsyncResource;
 import org.apache.tika.server.core.resource.DetectorResource;
 import org.apache.tika.server.core.resource.LanguageResource;
 import org.apache.tika.server.core.resource.MetadataResource;
+import org.apache.tika.server.core.resource.PipesParsingHelper;
 import org.apache.tika.server.core.resource.PipesResource;
 import org.apache.tika.server.core.resource.RecursiveMetadataResource;
 import org.apache.tika.server.core.resource.TikaDetectors;
@@ -190,7 +195,14 @@ public class TikaServerProcess {
         ServerStatus serverStatus;
 
         serverStatus = new ServerStatus();
-        TikaResource.init(tikaLoader, tikaServerConfig, inputStreamFactory, 
serverStatus);
+
+        // Initialize pipes-based parsing for process isolation
+        PipesParsingHelper pipesParsingHelper = 
initPipesParsingHelper(tikaServerConfig);
+        if (pipesParsingHelper != null) {
+            LOG.info("Pipes-based parsing enabled for /tika and /rmeta 
endpoints");
+        }
+
+        TikaResource.init(tikaLoader, tikaServerConfig, inputStreamFactory, 
serverStatus, pipesParsingHelper);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
 
         List<ResourceProvider> resourceProviders = new ArrayList<>();
@@ -460,6 +472,80 @@ public class TikaServerProcess {
         return new 
ServiceLoader(TikaServerProcess.class.getClassLoader()).loadServiceProviders(org.apache.tika.server.core.writer.TikaServerWriter.class);
     }
 
+    /**
+     * Initializes the PipesParsingHelper for pipes-based parsing with process 
isolation.
+     * <p>
+     * The PipesParser will be configured with PASSBACK_ALL emit strategy so 
that
+     * parsed content is returned directly instead of being emitted to an 
external emitter.
+     * <p>
+     * Required JSON configuration:
+     * <pre>
+     * {
+     *   "plugin-roots": "/path/to/plugins",
+     *   "fetchers": {
+     *     "file-system-fetcher": {
+     *       "file-system-fetcher": {
+     *         "allowAbsolutePaths": true
+     *       }
+     *     }
+     *   },
+     *   "pipes": {
+     *     "numClients": 4
+     *   }
+     * }
+     * </pre>
+     *
+     * @param tikaServerConfig the server configuration
+     * @return the PipesParsingHelper, or null if pipes config doesn't exist 
or initialization fails
+     */
+    private static PipesParsingHelper initPipesParsingHelper(TikaServerConfig 
tikaServerConfig) {
+        if (!tikaServerConfig.hasConfigFile()) {
+            LOG.debug("No config file - pipes-based parsing not enabled");
+            return null;
+        }
+
+        try {
+            TikaJsonConfig tikaJsonConfig = 
TikaJsonConfig.load(tikaServerConfig.getConfigPath());
+
+            // Check if pipes config exists
+            PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
+            if (pipesConfig.getNumClients() <= 0) {
+                LOG.debug("No pipes.numClients configured - pipes-based 
parsing not enabled");
+                return null;
+            }
+
+            // Force PASSBACK_ALL strategy so results are returned to us (not 
emitted)
+            pipesConfig.setEmitStrategy(new 
EmitStrategyConfig(EmitStrategy.PASSBACK_ALL));
+
+            // Create PipesParser
+            PipesParser pipesParser = PipesParser.load(tikaJsonConfig, 
pipesConfig,
+                    tikaServerConfig.getConfigPath());
+
+            // Create and return the helper
+            PipesParsingHelper helper = new PipesParsingHelper(pipesParser, 
pipesConfig);
+
+            // Register shutdown hook to clean up PipesParser
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                try {
+                    LOG.info("Shutting down PipesParser");
+                    pipesParser.close();
+                } catch (Exception e) {
+                    LOG.warn("Error closing PipesParser", e);
+                }
+            }));
+
+            return helper;
+        } catch (Exception e) {
+            LOG.warn("Failed to initialize pipes-based parsing, falling back 
to in-process parsing. " +
+                    "To enable pipes-based parsing, ensure your config 
includes: " +
+                    "1) 'plugin-roots' pointing to plugins directory, " +
+                    "2) 'fetchers.file-system-fetcher' with 
allowAbsolutePaths=true, " +
+                    "3) 'pipes.numClients' > 0. Error: {}", e.getMessage());
+            LOG.debug("Pipes initialization error details", e);
+            return null;
+        }
+    }
+
     private static class ServerDetails {
         JAXRSServerFactoryBean sf;
         String serverId;
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
index 698241cc3b..0f328d2971 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
@@ -64,6 +64,25 @@ public class RecursiveMetadataResource {
             throws Exception {
 
         final ParseContext context = new ParseContext();
+
+        // Use pipes-based parsing if enabled
+        if (TikaResource.isPipesParsingEnabled()) {
+            fillMetadata(null, metadata, httpHeaders);
+            TikaResource.logRequest(LOG, "/rmeta", metadata);
+
+            // Set up handler factory in context
+            BasicContentHandlerFactory.HANDLER_TYPE type = 
handlerConfig.type();
+            ContentHandlerFactory factory = new 
BasicContentHandlerFactory(type, handlerConfig.writeLimit(),
+                    handlerConfig.throwOnWriteLimitReached(), context);
+            context.set(ContentHandlerFactory.class, factory);
+
+            List<Metadata> metadataList = TikaResource.parseWithPipes(tis, 
metadata, context, ParseMode.RMETA);
+            MetadataFilter metadataFilter = context.get(MetadataFilter.class, 
getTikaLoader().loadMetadataFilters());
+            metadataFilter.filter(metadataList);
+            return metadataList;
+        }
+
+        // Legacy in-process parsing
         Parser parser = TikaResource.createParser();
 
         RecursiveParserWrapper wrapper = new RecursiveParserWrapper(parser);
@@ -178,6 +197,24 @@ public class RecursiveMetadataResource {
 
     private MetadataList parseMetadataWithContext(TikaInputStream tis, 
Metadata metadata, MultivaluedMap<String, String> httpHeaders,
                                                   UriInfo info, 
ServerHandlerConfig handlerConfig, ParseContext context) throws Exception {
+        // Use pipes-based parsing if enabled
+        if (TikaResource.isPipesParsingEnabled()) {
+            // Set up handler factory in context if not already set
+            ContentHandlerFactory factory = 
context.get(ContentHandlerFactory.class);
+            if (factory == null) {
+                BasicContentHandlerFactory.HANDLER_TYPE type = 
handlerConfig.type();
+                factory = new BasicContentHandlerFactory(type, 
handlerConfig.writeLimit(),
+                        handlerConfig.throwOnWriteLimitReached(), context);
+                context.set(ContentHandlerFactory.class, factory);
+            }
+
+            List<Metadata> metadataList = TikaResource.parseWithPipes(tis, 
metadata, context, ParseMode.RMETA);
+            MetadataFilter metadataFilter = context.get(MetadataFilter.class, 
getTikaLoader().loadMetadataFilters());
+            metadataFilter.filter(metadataList);
+            return new MetadataList(metadataList);
+        }
+
+        // Legacy in-process parsing
         Parser parser = TikaResource.createParser();
         RecursiveParserWrapper wrapper = new RecursiveParserWrapper(parser);
 
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
index ad37925249..889cc3002b 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
@@ -74,6 +74,7 @@ import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
+import org.apache.tika.pipes.api.ParseMode;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.sax.BodyContentHandler;
 import org.apache.tika.sax.ContentHandlerFactory;
@@ -100,14 +101,48 @@ public class TikaResource {
     private static TikaServerConfig TIKA_SERVER_CONFIG;
     private static InputStreamFactory INPUTSTREAM_FACTORY = null;
     private static ServerStatus SERVER_STATUS = null;
+    private static PipesParsingHelper PIPES_PARSING_HELPER = null;
 
-
-    public static void init(TikaLoader tikaLoader, TikaServerConfig 
tikaServerConfg, InputStreamFactory inputStreamFactory,
+    /**
+     * Initialize TikaResource without pipes-based parsing (legacy mode).
+     */
+    public static void init(TikaLoader tikaLoader, TikaServerConfig 
tikaServerConfig, InputStreamFactory inputStreamFactory,
                             ServerStatus serverStatus) {
+        init(tikaLoader, tikaServerConfig, inputStreamFactory, serverStatus, 
null);
+    }
+
+    /**
+     * Initialize TikaResource with pipes-based parsing for process isolation.
+     *
+     * @param tikaLoader the Tika loader
+     * @param tikaServerConfig server configuration
+     * @param inputStreamFactory input stream factory
+     * @param serverStatus server status tracker
+     * @param pipesParsingHelper helper for pipes-based parsing (may be null 
for legacy mode)
+     */
+    public static void init(TikaLoader tikaLoader, TikaServerConfig 
tikaServerConfig, InputStreamFactory inputStreamFactory,
+                            ServerStatus serverStatus, PipesParsingHelper 
pipesParsingHelper) {
         TIKA_LOADER = tikaLoader;
-        TIKA_SERVER_CONFIG = tikaServerConfg;
+        TIKA_SERVER_CONFIG = tikaServerConfig;
         INPUTSTREAM_FACTORY = inputStreamFactory;
         SERVER_STATUS = serverStatus;
+        PIPES_PARSING_HELPER = pipesParsingHelper;
+    }
+
+    /**
+     * Returns true if pipes-based parsing is enabled.
+     */
+    public static boolean isPipesParsingEnabled() {
+        return PIPES_PARSING_HELPER != null;
+    }
+
+    /**
+     * Gets the PipesParsingHelper instance.
+     *
+     * @return the helper, or null if pipes-based parsing is not enabled
+     */
+    public static PipesParsingHelper getPipesParsingHelper() {
+        return PIPES_PARSING_HELPER;
     }
 
 
@@ -323,6 +358,34 @@ public class TikaResource {
         }
     }
 
+    /**
+     * Parses using pipes-based parsing with process isolation.
+     * This method writes the input to a temp file, invokes PipesParser,
+     * and returns the metadata list.
+     *
+     * @param inputStream the input stream to parse
+     * @param metadata metadata to pass to the parser
+     * @param parseContext parse context with handler configuration
+     * @param parseMode RMETA or CONCATENATE
+     * @return list of metadata objects from parsing
+     * @throws IOException if parsing fails
+     */
+    public static List<Metadata> parseWithPipes(InputStream inputStream, 
Metadata metadata,
+                                                 ParseContext parseContext, 
ParseMode parseMode)
+            throws IOException {
+        checkIsOperating();
+
+        if (PIPES_PARSING_HELPER == null) {
+            throw new IllegalStateException("Pipes-based parsing is not 
enabled");
+        }
+
+        String fileName = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+        LOG.debug("Parsing with pipes: {}", fileName);
+
+        return PIPES_PARSING_HELPER.parse(inputStream, metadata, parseContext, 
parseMode,
+                TIKA_SERVER_CONFIG.isReturnStackTrace());
+    }
+
     public static void checkIsOperating() {
         //check that server is not in shutdown mode
         if (!SERVER_STATUS.isOperating()) {
@@ -500,9 +563,25 @@ public class TikaResource {
     public Metadata getJsonFromMultipart(Attachment att, @Context HttpHeaders 
httpHeaders, @Context final UriInfo info, @PathParam(HANDLER_TYPE_PARAM) String 
handlerTypeName)
             throws IOException, TikaException {
         Metadata metadata = new Metadata();
-        parseToMetadata(getInputStream(att.getObject(InputStream.class), 
metadata, httpHeaders, info), metadata, preparePostHeaderMap(att, httpHeaders), 
info, handlerTypeName);
-        List<Metadata> metadataList = new ArrayList<>();
-        metadataList.add(metadata);
+        List<Metadata> metadataList;
+
+        if (isPipesParsingEnabled()) {
+            // Use pipes-based parsing
+            TikaInputStream tis = 
getInputStream(att.getObject(InputStream.class), metadata, httpHeaders, info);
+            MultivaluedMap<String, String> headers = preparePostHeaderMap(att, 
httpHeaders);
+            fillMetadata(null, metadata, headers);
+            logRequest(LOG, "/tika", metadata);
+
+            ParseContext context = new ParseContext();
+            setupHandlerFactory(context, handlerTypeName, headers);
+            metadataList = parseWithPipes(tis, metadata, context, 
ParseMode.CONCATENATE);
+        } else {
+            // Legacy in-process parsing
+            parseToMetadata(getInputStream(att.getObject(InputStream.class), 
metadata, httpHeaders, info), metadata, preparePostHeaderMap(att, httpHeaders), 
info, handlerTypeName);
+            metadataList = new ArrayList<>();
+            metadataList.add(metadata);
+        }
+
         
TikaResource.getTikaLoader().loadMetadataFilters().filter(metadataList);
         if (metadataList.isEmpty()) {
             return new Metadata();
@@ -517,9 +596,25 @@ public class TikaResource {
     public Metadata getJson(final InputStream is, @Context HttpHeaders 
httpHeaders, @Context final UriInfo info, @PathParam(HANDLER_TYPE_PARAM) String 
handlerTypeName)
             throws IOException, TikaException {
         Metadata metadata = new Metadata();
-        parseToMetadata(getInputStream(is, metadata, httpHeaders, info), 
metadata, httpHeaders.getRequestHeaders(), info, handlerTypeName);
-        List<Metadata> metadataList = new ArrayList<>();
-        metadataList.add(metadata);
+        List<Metadata> metadataList;
+
+        if (isPipesParsingEnabled()) {
+            // Use pipes-based parsing
+            TikaInputStream tis = getInputStream(is, metadata, httpHeaders, 
info);
+            MultivaluedMap<String, String> headers = 
httpHeaders.getRequestHeaders();
+            fillMetadata(null, metadata, headers);
+            logRequest(LOG, "/tika", metadata);
+
+            ParseContext context = new ParseContext();
+            setupHandlerFactory(context, handlerTypeName, headers);
+            metadataList = parseWithPipes(tis, metadata, context, 
ParseMode.CONCATENATE);
+        } else {
+            // Legacy in-process parsing
+            parseToMetadata(getInputStream(is, metadata, httpHeaders, info), 
metadata, httpHeaders.getRequestHeaders(), info, handlerTypeName);
+            metadataList = new ArrayList<>();
+            metadataList.add(metadata);
+        }
+
         
TikaResource.getTikaLoader().loadMetadataFilters().filter(metadataList);
         if (metadataList.isEmpty()) {
             return new Metadata();
@@ -527,6 +622,21 @@ public class TikaResource {
         return metadataList.get(0);
     }
 
+    /**
+     * Sets up the ContentHandlerFactory in the ParseContext based on handler 
type and HTTP headers.
+     */
+    private void setupHandlerFactory(ParseContext context, String 
handlerTypeName, MultivaluedMap<String, String> httpHeaders) {
+        int writeLimit = -1;
+        boolean throwOnWriteLimitReached = 
getThrowOnWriteLimitReached(httpHeaders);
+        if (httpHeaders.containsKey("writeLimit")) {
+            writeLimit = Integer.parseInt(httpHeaders.getFirst("writeLimit"));
+        }
+
+        BasicContentHandlerFactory.HANDLER_TYPE type = 
BasicContentHandlerFactory.parseHandlerType(handlerTypeName, 
DEFAULT_HANDLER_TYPE);
+        ContentHandlerFactory factory = new BasicContentHandlerFactory(type, 
writeLimit, throwOnWriteLimitReached, context);
+        context.set(ContentHandlerFactory.class, factory);
+    }
+
     private void parseToMetadata(TikaInputStream tis, Metadata metadata, 
MultivaluedMap<String, String> httpHeaders, UriInfo info, String 
handlerTypeName)
             throws IOException, TikaConfigException {
         final Parser parser = createParser();

Reply via email to