This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4626 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 47cdae7c52da50951548319e4535a75684edb65d Author: tallison <[email protected]> AuthorDate: Sun Jan 18 08:31:27 2026 -0500 checkpoint --- .../org/apache/tika/pipes/core/PipesConfig.java | 29 ++++- .../apache/tika/server/core/TikaServerProcess.java | 88 +++++++++++++- .../core/resource/RecursiveMetadataResource.java | 37 ++++++ .../tika/server/core/resource/TikaResource.java | 128 +++++++++++++++++++-- 4 files changed, 271 insertions(+), 11 deletions(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java index 74cd509a0a..f0ae1bffd1 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java @@ -100,7 +100,14 @@ public class PipesConfig { private ArrayList<String> forkedJvmArgs = new ArrayList<>(); private String javaPath = "java"; - + + /** + * Optional directory for temporary files during pipes-based parsing. + * If not set, the system default temp directory will be used. + * Consider using a RAM-backed filesystem (e.g., /dev/shm) for better performance. + */ + private String tempDirectory = null; + /** * Type of ConfigStore to use for distributed state management. * Options: "memory" (default), "ignite" @@ -435,4 +442,24 @@ public class PipesConfig { public void setConfigStoreParams(String configStoreParams) { this.configStoreParams = configStoreParams; } + + /** + * Gets the directory for temporary files during pipes-based parsing. + * + * @return the temp directory path, or null to use system default + */ + public String getTempDirectory() { + return tempDirectory; + } + + /** + * Sets the directory for temporary files during pipes-based parsing. + * If not set, the system default temp directory will be used. + * Consider using a RAM-backed filesystem (e.g., /dev/shm or /tmpfs) for better performance. + * + * @param tempDirectory the temp directory path, or null to use system default + */ + public void setTempDirectory(String tempDirectory) { + this.tempDirectory = tempDirectory; + } } diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java index 09de5b18ae..d657c73ee6 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java @@ -58,6 +58,10 @@ import org.apache.tika.config.ServiceLoader; import org.apache.tika.config.loader.TikaJsonConfig; import org.apache.tika.config.loader.TikaLoader; import org.apache.tika.exception.TikaException; +import org.apache.tika.pipes.core.EmitStrategy; +import org.apache.tika.pipes.core.EmitStrategyConfig; +import org.apache.tika.pipes.core.PipesConfig; +import org.apache.tika.pipes.core.PipesParser; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.fetcher.FetcherManager; import org.apache.tika.plugins.TikaPluginManager; @@ -65,6 +69,7 @@ import org.apache.tika.server.core.resource.AsyncResource; import org.apache.tika.server.core.resource.DetectorResource; import org.apache.tika.server.core.resource.LanguageResource; import org.apache.tika.server.core.resource.MetadataResource; +import org.apache.tika.server.core.resource.PipesParsingHelper; import org.apache.tika.server.core.resource.PipesResource; import org.apache.tika.server.core.resource.RecursiveMetadataResource; import org.apache.tika.server.core.resource.TikaDetectors; @@ -190,7 +195,14 @@ public class TikaServerProcess { ServerStatus serverStatus; serverStatus = new ServerStatus(); - TikaResource.init(tikaLoader, tikaServerConfig, inputStreamFactory, serverStatus); + + // Initialize pipes-based parsing for process isolation + PipesParsingHelper pipesParsingHelper = initPipesParsingHelper(tikaServerConfig); + if (pipesParsingHelper != null) { + LOG.info("Pipes-based parsing enabled for /tika and /rmeta endpoints"); + } + + TikaResource.init(tikaLoader, tikaServerConfig, inputStreamFactory, serverStatus, pipesParsingHelper); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); List<ResourceProvider> resourceProviders = new ArrayList<>(); @@ -460,6 +472,80 @@ public class TikaServerProcess { return new ServiceLoader(TikaServerProcess.class.getClassLoader()).loadServiceProviders(org.apache.tika.server.core.writer.TikaServerWriter.class); } + /** + * Initializes the PipesParsingHelper for pipes-based parsing with process isolation. + * <p> + * The PipesParser will be configured with PASSBACK_ALL emit strategy so that + * parsed content is returned directly instead of being emitted to an external emitter. + * <p> + * Required JSON configuration: + * <pre> + * { + * "plugin-roots": "/path/to/plugins", + * "fetchers": { + * "file-system-fetcher": { + * "file-system-fetcher": { + * "allowAbsolutePaths": true + * } + * } + * }, + * "pipes": { + * "numClients": 4 + * } + * } + * </pre> + * + * @param tikaServerConfig the server configuration + * @return the PipesParsingHelper, or null if pipes config doesn't exist or initialization fails + */ + private static PipesParsingHelper initPipesParsingHelper(TikaServerConfig tikaServerConfig) { + if (!tikaServerConfig.hasConfigFile()) { + LOG.debug("No config file - pipes-based parsing not enabled"); + return null; + } + + try { + TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaServerConfig.getConfigPath()); + + // Check if pipes config exists + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); + if (pipesConfig.getNumClients() <= 0) { + LOG.debug("No pipes.numClients configured - pipes-based parsing not enabled"); + return null; + } + + // Force PASSBACK_ALL strategy so results are returned to us (not emitted) + pipesConfig.setEmitStrategy(new EmitStrategyConfig(EmitStrategy.PASSBACK_ALL)); + + // Create PipesParser + PipesParser pipesParser = PipesParser.load(tikaJsonConfig, pipesConfig, + tikaServerConfig.getConfigPath()); + + // Create and return the helper + PipesParsingHelper helper = new PipesParsingHelper(pipesParser, pipesConfig); + + // Register shutdown hook to clean up PipesParser + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + LOG.info("Shutting down PipesParser"); + pipesParser.close(); + } catch (Exception e) { + LOG.warn("Error closing PipesParser", e); + } + })); + + return helper; + } catch (Exception e) { + LOG.warn("Failed to initialize pipes-based parsing, falling back to in-process parsing. " + + "To enable pipes-based parsing, ensure your config includes: " + + "1) 'plugin-roots' pointing to plugins directory, " + + "2) 'fetchers.file-system-fetcher' with allowAbsolutePaths=true, " + + "3) 'pipes.numClients' > 0. Error: {}", e.getMessage()); + LOG.debug("Pipes initialization error details", e); + return null; + } + } + private static class ServerDetails { JAXRSServerFactoryBean sf; String serverId; diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java index 698241cc3b..0f328d2971 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java @@ -64,6 +64,25 @@ public class RecursiveMetadataResource { throws Exception { final ParseContext context = new ParseContext(); + + // Use pipes-based parsing if enabled + if (TikaResource.isPipesParsingEnabled()) { + fillMetadata(null, metadata, httpHeaders); + TikaResource.logRequest(LOG, "/rmeta", metadata); + + // Set up handler factory in context + BasicContentHandlerFactory.HANDLER_TYPE type = handlerConfig.type(); + ContentHandlerFactory factory = new BasicContentHandlerFactory(type, handlerConfig.writeLimit(), + handlerConfig.throwOnWriteLimitReached(), context); + context.set(ContentHandlerFactory.class, factory); + + List<Metadata> metadataList = TikaResource.parseWithPipes(tis, metadata, context, ParseMode.RMETA); + MetadataFilter metadataFilter = context.get(MetadataFilter.class, getTikaLoader().loadMetadataFilters()); + metadataFilter.filter(metadataList); + return metadataList; + } + + // Legacy in-process parsing Parser parser = TikaResource.createParser(); RecursiveParserWrapper wrapper = new RecursiveParserWrapper(parser); @@ -178,6 +197,24 @@ public class RecursiveMetadataResource { private MetadataList parseMetadataWithContext(TikaInputStream tis, Metadata metadata, MultivaluedMap<String, String> httpHeaders, UriInfo info, ServerHandlerConfig handlerConfig, ParseContext context) throws Exception { + // Use pipes-based parsing if enabled + if (TikaResource.isPipesParsingEnabled()) { + // Set up handler factory in context if not already set + ContentHandlerFactory factory = context.get(ContentHandlerFactory.class); + if (factory == null) { + BasicContentHandlerFactory.HANDLER_TYPE type = handlerConfig.type(); + factory = new BasicContentHandlerFactory(type, handlerConfig.writeLimit(), + handlerConfig.throwOnWriteLimitReached(), context); + context.set(ContentHandlerFactory.class, factory); + } + + List<Metadata> metadataList = TikaResource.parseWithPipes(tis, metadata, context, ParseMode.RMETA); + MetadataFilter metadataFilter = context.get(MetadataFilter.class, getTikaLoader().loadMetadataFilters()); + metadataFilter.filter(metadataList); + return new MetadataList(metadataList); + } + + // Legacy in-process parsing Parser parser = TikaResource.createParser(); RecursiveParserWrapper wrapper = new RecursiveParserWrapper(parser); diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java index ad37925249..889cc3002b 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java @@ -74,6 +74,7 @@ import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; +import org.apache.tika.pipes.api.ParseMode; import org.apache.tika.sax.BasicContentHandlerFactory; import org.apache.tika.sax.BodyContentHandler; import org.apache.tika.sax.ContentHandlerFactory; @@ -100,14 +101,48 @@ public class TikaResource { private static TikaServerConfig TIKA_SERVER_CONFIG; private static InputStreamFactory INPUTSTREAM_FACTORY = null; private static ServerStatus SERVER_STATUS = null; + private static PipesParsingHelper PIPES_PARSING_HELPER = null; - - public static void init(TikaLoader tikaLoader, TikaServerConfig tikaServerConfg, InputStreamFactory inputStreamFactory, + /** + * Initialize TikaResource without pipes-based parsing (legacy mode). + */ + public static void init(TikaLoader tikaLoader, TikaServerConfig tikaServerConfig, InputStreamFactory inputStreamFactory, ServerStatus serverStatus) { + init(tikaLoader, tikaServerConfig, inputStreamFactory, serverStatus, null); + } + + /** + * Initialize TikaResource with pipes-based parsing for process isolation. + * + * @param tikaLoader the Tika loader + * @param tikaServerConfig server configuration + * @param inputStreamFactory input stream factory + * @param serverStatus server status tracker + * @param pipesParsingHelper helper for pipes-based parsing (may be null for legacy mode) + */ + public static void init(TikaLoader tikaLoader, TikaServerConfig tikaServerConfig, InputStreamFactory inputStreamFactory, + ServerStatus serverStatus, PipesParsingHelper pipesParsingHelper) { TIKA_LOADER = tikaLoader; - TIKA_SERVER_CONFIG = tikaServerConfg; + TIKA_SERVER_CONFIG = tikaServerConfig; INPUTSTREAM_FACTORY = inputStreamFactory; SERVER_STATUS = serverStatus; + PIPES_PARSING_HELPER = pipesParsingHelper; + } + + /** + * Returns true if pipes-based parsing is enabled. + */ + public static boolean isPipesParsingEnabled() { + return PIPES_PARSING_HELPER != null; + } + + /** + * Gets the PipesParsingHelper instance. + * + * @return the helper, or null if pipes-based parsing is not enabled + */ + public static PipesParsingHelper getPipesParsingHelper() { + return PIPES_PARSING_HELPER; } @@ -323,6 +358,34 @@ public class TikaResource { } } + /** + * Parses using pipes-based parsing with process isolation. + * This method writes the input to a temp file, invokes PipesParser, + * and returns the metadata list. + * + * @param inputStream the input stream to parse + * @param metadata metadata to pass to the parser + * @param parseContext parse context with handler configuration + * @param parseMode RMETA or CONCATENATE + * @return list of metadata objects from parsing + * @throws IOException if parsing fails + */ + public static List<Metadata> parseWithPipes(InputStream inputStream, Metadata metadata, + ParseContext parseContext, ParseMode parseMode) + throws IOException { + checkIsOperating(); + + if (PIPES_PARSING_HELPER == null) { + throw new IllegalStateException("Pipes-based parsing is not enabled"); + } + + String fileName = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY); + LOG.debug("Parsing with pipes: {}", fileName); + + return PIPES_PARSING_HELPER.parse(inputStream, metadata, parseContext, parseMode, + TIKA_SERVER_CONFIG.isReturnStackTrace()); + } + public static void checkIsOperating() { //check that server is not in shutdown mode if (!SERVER_STATUS.isOperating()) { @@ -500,9 +563,25 @@ public class TikaResource { public Metadata getJsonFromMultipart(Attachment att, @Context HttpHeaders httpHeaders, @Context final UriInfo info, @PathParam(HANDLER_TYPE_PARAM) String handlerTypeName) throws IOException, TikaException { Metadata metadata = new Metadata(); - parseToMetadata(getInputStream(att.getObject(InputStream.class), metadata, httpHeaders, info), metadata, preparePostHeaderMap(att, httpHeaders), info, handlerTypeName); - List<Metadata> metadataList = new ArrayList<>(); - metadataList.add(metadata); + List<Metadata> metadataList; + + if (isPipesParsingEnabled()) { + // Use pipes-based parsing + TikaInputStream tis = getInputStream(att.getObject(InputStream.class), metadata, httpHeaders, info); + MultivaluedMap<String, String> headers = preparePostHeaderMap(att, httpHeaders); + fillMetadata(null, metadata, headers); + logRequest(LOG, "/tika", metadata); + + ParseContext context = new ParseContext(); + setupHandlerFactory(context, handlerTypeName, headers); + metadataList = parseWithPipes(tis, metadata, context, ParseMode.CONCATENATE); + } else { + // Legacy in-process parsing + parseToMetadata(getInputStream(att.getObject(InputStream.class), metadata, httpHeaders, info), metadata, preparePostHeaderMap(att, httpHeaders), info, handlerTypeName); + metadataList = new ArrayList<>(); + metadataList.add(metadata); + } + TikaResource.getTikaLoader().loadMetadataFilters().filter(metadataList); if (metadataList.isEmpty()) { return new Metadata(); @@ -517,9 +596,25 @@ public class TikaResource { public Metadata getJson(final InputStream is, @Context HttpHeaders httpHeaders, @Context final UriInfo info, @PathParam(HANDLER_TYPE_PARAM) String handlerTypeName) throws IOException, TikaException { Metadata metadata = new Metadata(); - parseToMetadata(getInputStream(is, metadata, httpHeaders, info), metadata, httpHeaders.getRequestHeaders(), info, handlerTypeName); - List<Metadata> metadataList = new ArrayList<>(); - metadataList.add(metadata); + List<Metadata> metadataList; + + if (isPipesParsingEnabled()) { + // Use pipes-based parsing + TikaInputStream tis = getInputStream(is, metadata, httpHeaders, info); + MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders(); + fillMetadata(null, metadata, headers); + logRequest(LOG, "/tika", metadata); + + ParseContext context = new ParseContext(); + setupHandlerFactory(context, handlerTypeName, headers); + metadataList = parseWithPipes(tis, metadata, context, ParseMode.CONCATENATE); + } else { + // Legacy in-process parsing + parseToMetadata(getInputStream(is, metadata, httpHeaders, info), metadata, httpHeaders.getRequestHeaders(), info, handlerTypeName); + metadataList = new ArrayList<>(); + metadataList.add(metadata); + } + TikaResource.getTikaLoader().loadMetadataFilters().filter(metadataList); if (metadataList.isEmpty()) { return new Metadata(); @@ -527,6 +622,21 @@ public class TikaResource { return metadataList.get(0); } + /** + * Sets up the ContentHandlerFactory in the ParseContext based on handler type and HTTP headers. + */ + private void setupHandlerFactory(ParseContext context, String handlerTypeName, MultivaluedMap<String, String> httpHeaders) { + int writeLimit = -1; + boolean throwOnWriteLimitReached = getThrowOnWriteLimitReached(httpHeaders); + if (httpHeaders.containsKey("writeLimit")) { + writeLimit = Integer.parseInt(httpHeaders.getFirst("writeLimit")); + } + + BasicContentHandlerFactory.HANDLER_TYPE type = BasicContentHandlerFactory.parseHandlerType(handlerTypeName, DEFAULT_HANDLER_TYPE); + ContentHandlerFactory factory = new BasicContentHandlerFactory(type, writeLimit, throwOnWriteLimitReached, context); + context.set(ContentHandlerFactory.class, factory); + } + private void parseToMetadata(TikaInputStream tis, Metadata metadata, MultivaluedMap<String, String> httpHeaders, UriInfo info, String handlerTypeName) throws IOException, TikaConfigException { final Parser parser = createParser();
