This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new e113b93e3 TIKA-4575 -- fix race condition with unzipping plugins
(#2458)
e113b93e3 is described below
commit e113b93e397bae5bd1205397147ef2147ef84cf7
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 16 14:16:31 2025 -0500
TIKA-4575 -- fix race condition with unzipping plugins (#2458)
Generated-by: Claude Opus 4.5 (model ID: claude-opus-4-5-20251101)
---
.../org/apache/tika/async/cli/TikaAsyncCLI.java | 2 +-
.../apache/tika/async/cli/AsyncProcessorTest.java | 4 +-
.../org/apache/tika/pipes/core/PipesParser.java | 46 +++++-
.../tika/pipes/core/async/AsyncProcessor.java | 36 ++++-
.../apache/tika/pipes/fork/PipesForkParser.java | 9 +-
.../pipes/core/async/AsyncChaosMonkeyTest.java | 4 +-
.../apache/tika/plugins/ThreadSafeUnzipper.java | 167 ++++++++++++++-------
.../org/apache/tika/plugins/TikaPluginManager.java | 48 ++++++
.../tika/server/core/resource/AsyncResource.java | 2 +-
.../tika/server/core/resource/PipesResource.java | 2 +-
10 files changed, 253 insertions(+), 67 deletions(-)
diff --git
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
index 3a9c5cec3..6576c904e 100644
---
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
+++
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
@@ -259,7 +259,7 @@ public class TikaAsyncCLI {
private static void processWithTikaConfig(PipesIterator pipesIterator,
Path tikaConfigPath, SimpleAsyncConfig asyncConfig) throws Exception {
long start = System.currentTimeMillis();
- try (AsyncProcessor processor = new AsyncProcessor(tikaConfigPath,
pipesIterator)) {
+ try (AsyncProcessor processor = AsyncProcessor.load(tikaConfigPath,
pipesIterator)) {
for (FetchEmitTuple t : pipesIterator) {
configureExtractBytes(t, asyncConfig);
diff --git
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
index 29bf9ebf8..4bd181699 100644
---
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
+++
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
@@ -114,7 +114,7 @@ public class AsyncProcessorTest extends TikaTest {
public void testRecursiveUnpacking() throws Exception {
// TikaAsyncCLI cli = new TikaAsyncCLI();
// cli.main(new String[]{
configDir.resolve("tika-config.xml").toAbsolutePath().toString()});
- AsyncProcessor processor = new
AsyncProcessor(configDir.resolve("tika-config.json"));
+ AsyncProcessor processor =
AsyncProcessor.load(configDir.resolve("tika-config.json"));
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = new
EmbeddedDocumentBytesConfig(true);
embeddedDocumentBytesConfig.setIncludeOriginal(true);
@@ -163,7 +163,7 @@ public class AsyncProcessorTest extends TikaTest {
public void testStopsOnApplicationError() throws Exception {
// Test that AsyncProcessor stops processing when an application error
occurs
// (TIKA-4570)
- AsyncProcessor processor = new
AsyncProcessor(configDir.resolve("tika-config.json"));
+ AsyncProcessor processor =
AsyncProcessor.load(configDir.resolve("tika-config.json"));
// Create a tuple with a non-existent fetcher - this will cause
FETCHER_NOT_FOUND
// which is a TASK_EXCEPTION but will stop processing in CLI mode
(default)
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
index 42bf6b596..bfa4a61df 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
@@ -24,19 +24,59 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.plugins.TikaPluginManager;
public class PipesParser implements Closeable {
+ /**
+ * Loads a PipesParser from a configuration file path.
+ * <p>
+ * This method:
+ * <ol>
+ * <li>Loads the JSON configuration</li>
+ * <li>Pre-extracts plugins before spawning child processes</li>
+ * <li>Creates the PipesParser with the loaded configuration</li>
+ * </ol>
+ *
+ * @param tikaConfigPath path to the tika-config.json file
+ * @return a new PipesParser instance
+ * @throws IOException if reading config or extraction fails
+ * @throws TikaConfigException if configuration is invalid
+ */
+ public static PipesParser load(Path tikaConfigPath) throws IOException,
TikaConfigException {
+ TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
+ return load(tikaJsonConfig, pipesConfig, tikaConfigPath);
+ }
+
+ /**
+ * Loads a PipesParser from pre-loaded configuration objects.
+ * <p>
+ * Use this method when you need to modify the PipesConfig before creating
+ * the parser (e.g., to override emit strategy).
+ *
+ * @param tikaJsonConfig the pre-loaded JSON configuration
+ * @param pipesConfig the pipes configuration (may be modified by caller)
+ * @param tikaConfigPath path to the config file (passed to child
processes)
+ * @return a new PipesParser instance
+ * @throws IOException if plugin extraction fails
+ */
+ public static PipesParser load(TikaJsonConfig tikaJsonConfig, PipesConfig
pipesConfig,
+ Path tikaConfigPath) throws IOException {
+ TikaPluginManager.preExtractPlugins(tikaJsonConfig);
+ return new PipesParser(pipesConfig, tikaConfigPath);
+ }
private final PipesConfig pipesConfig;
private final Path tikaConfigPath;
private final List<PipesClient> clients = new ArrayList<>();
- private final ArrayBlockingQueue<PipesClient> clientQueue ;
-
+ private final ArrayBlockingQueue<PipesClient> clientQueue;
- public PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
+ private PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
this.pipesConfig = pipesConfig;
this.tikaConfigPath = tikaConfigPath;
this.clientQueue = new
ArrayBlockingQueue<>(pipesConfig.getNumClients());
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index f741ee45f..78ecc70d7 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -79,12 +79,42 @@ public class AsyncProcessor implements Closeable {
private boolean addedEmitterSemaphores = false;
boolean isShuttingDown = false;
- public AsyncProcessor(Path tikaConfigPath) throws TikaException,
IOException {
- this(tikaConfigPath, null);
+ /**
+ * Loads an AsyncProcessor from a configuration file path.
+ * <p>
+ * This method pre-extracts plugins before loading, ensuring child
processes
+ * don't race to extract the same plugins.
+ *
+ * @param tikaConfigPath path to the tika-config.json file
+ * @return a new AsyncProcessor instance
+ * @throws IOException if reading config or plugin extraction fails
+ * @throws TikaException if configuration is invalid
+ */
+ public static AsyncProcessor load(Path tikaConfigPath) throws
TikaException, IOException {
+ return load(tikaConfigPath, null);
}
- public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator)
throws TikaException, IOException {
+ /**
+ * Loads an AsyncProcessor from a configuration file path with a custom
PipesIterator.
+ * <p>
+ * This method pre-extracts plugins before loading, ensuring child
processes
+ * don't race to extract the same plugins.
+ *
+ * @param tikaConfigPath path to the tika-config.json file
+ * @param pipesIterator optional custom pipes iterator (may be null)
+ * @return a new AsyncProcessor instance
+ * @throws IOException if reading config or plugin extraction fails
+ * @throws TikaException if configuration is invalid
+ */
+ public static AsyncProcessor load(Path tikaConfigPath, PipesIterator
pipesIterator)
+ throws TikaException, IOException {
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+ TikaPluginManager.preExtractPlugins(tikaJsonConfig);
+ return new AsyncProcessor(tikaConfigPath, pipesIterator,
tikaJsonConfig);
+ }
+
+ private AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator,
+ TikaJsonConfig tikaJsonConfig) throws TikaException, IOException {
TikaPluginManager tikaPluginManager =
TikaPluginManager.load(tikaJsonConfig);
MetadataFilter metadataFilter =
TikaLoader.load(tikaConfigPath).loadMetadataFilters();
this.asyncConfig = PipesConfig.load(tikaJsonConfig);
diff --git
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
index 01296a92a..0420596d5 100644
---
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
+++
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
@@ -119,8 +120,9 @@ public class PipesForkParser implements Closeable {
* Creates a new PipesForkParser with default configuration.
*
* @throws IOException if the temporary config file cannot be created
+ * @throws TikaConfigException if configuration is invalid
*/
- public PipesForkParser() throws IOException {
+ public PipesForkParser() throws IOException, TikaConfigException {
this(new PipesForkParserConfig());
}
@@ -129,11 +131,12 @@ public class PipesForkParser implements Closeable {
*
* @param config the configuration for this parser
* @throws IOException if the temporary config file cannot be created
+ * @throws TikaConfigException if configuration is invalid
*/
- public PipesForkParser(PipesForkParserConfig config) throws IOException {
+ public PipesForkParser(PipesForkParserConfig config) throws IOException,
TikaConfigException {
this.config = config;
this.tikaConfigPath = createTikaConfigFile();
- this.pipesParser = new PipesParser(config.getPipesConfig(),
tikaConfigPath);
+ this.pipesParser = PipesParser.load(tikaConfigPath);
}
/**
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
index 54fb0bba4..f195e3222 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
@@ -132,7 +132,7 @@ public class AsyncChaosMonkeyTest {
@Test
public void testBasic(@TempDir Path tmpDir) throws Exception {
- AsyncProcessor processor = new AsyncProcessor(setUp(tmpDir, false));
+ AsyncProcessor processor = AsyncProcessor.load(setUp(tmpDir, false));
for (int i = 0; i < totalFiles; i++) {
FetchEmitTuple t = new FetchEmitTuple("myId-" + i,
new FetchKey(fetcherPluginId, i + ".xml"),
@@ -164,7 +164,7 @@ public class AsyncChaosMonkeyTest {
@Test
public void testEmitIntermediate(@TempDir Path tmpDir) throws Exception {
- AsyncProcessor processor = new AsyncProcessor(setUp(tmpDir, true));
+ AsyncProcessor processor = AsyncProcessor.load(setUp(tmpDir, true));
for (int i = 0; i < totalFiles; i++) {
FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new
FetchKey(fetcherPluginId, i + ".xml"),
new EmitKey(emitterPluginId, "emit-" + i), new Metadata());
diff --git
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
index 7b04624a0..7287a4cbd 100644
---
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
+++
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
@@ -16,80 +16,130 @@
*/
package org.apache.tika.plugins;
-import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.file.StandardCopyOption;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.stream.Stream;
import org.pf4j.util.Unzip;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Thread-safe and process-safe plugin unzipper using atomic rename.
+ * <p>
+ * This avoids file locking issues on Windows by using a simple strategy:
+ * <ol>
+ * <li>Check if destination directory exists with completion marker - if
yes, already extracted</li>
+ * <li>Extract to a temporary directory with a unique name</li>
+ * <li>Create a completion marker file in the temp directory</li>
+ * <li>Atomically rename temp dir to final destination</li>
+ * <li>If rename fails (another process won), clean up temp dir</li>
+ * </ol>
+ * <p>
+ * The completion marker ensures that even if atomic move is not supported,
+ * other processes won't attempt to load a partially-moved directory.
+ */
public class ThreadSafeUnzipper {
private static final Logger LOG =
LoggerFactory.getLogger(TikaPluginManager.class);
+ private static final String COMPLETE_MARKER = ".tika-extraction-complete";
- private static final long MAX_WAIT_MS = 60000;
-
- public static synchronized void unzipPlugin(Path source) throws
IOException {
- if (! source.getFileName().toString().endsWith(".zip")) {
+ /**
+ * Unzips a plugin zip file to a directory with the same name (minus .zip
extension).
+ * Safe for concurrent calls from multiple threads or processes. See
+ * documentation at the head of this class for how it works.
+ *
+ * @param source path to the .zip file
+ * @throws IOException if extraction fails
+ */
+ public static void unzipPlugin(Path source) throws IOException {
+ if (!source.getFileName().toString().endsWith(".zip")) {
throw new IllegalArgumentException("source file name must end in
'.zip'");
}
- File lockFile = new File(source.toAbsolutePath() + ".lock");
- FileChannel fileChannel = null;
- FileLock fileLock = null;
- List<IOException> exceptions = new ArrayList<>();
+
+ Path destination = getDestination(source);
+
+ // Already extracted - check for both directory AND completion marker
+ if (isExtractionComplete(destination)) {
+ LOG.debug("{} is already extracted", source);
+ return;
+ }
+
+ // Extract to a unique temp directory
+ Path tempDir = destination.resolveSibling(
+ destination.getFileName() + ".tmp." + UUID.randomUUID());
+
try {
- fileChannel = new RandomAccessFile(lockFile, "rw").getChannel();
- LOG.debug("acquiring lock");
- fileLock = fileChannel.lock();
- LOG.debug("acquired lock");
- if (isExtracted(source)) {
- LOG.debug("{} is already extracted", source);
- return;
- }
- extract(source);
- } finally {
- if (fileLock != null && fileLock.isValid()) {
- try {
- fileLock.release();
- } catch (IOException e) {
- LOG.warn("failed to release the lock");
- exceptions.add(e);
- }
- }
- if (fileChannel != null) {
+ LOG.debug("extracting {} to temp dir {}", source, tempDir);
+ new Unzip(source.toFile(), tempDir.toFile()).extract();
+
+ // Create completion marker in temp dir before moving
+ Files.createFile(tempDir.resolve(COMPLETE_MARKER));
+
+ // Atomically rename to final destination
+ try {
+ Files.move(tempDir, destination,
StandardCopyOption.ATOMIC_MOVE);
+ LOG.debug("successfully extracted {}", destination);
+ } catch (FileAlreadyExistsException | DirectoryNotEmptyException
e) {
+ // Another process extracted it first - wait for completion
marker
+ LOG.debug("plugin already extracted by another process: {}",
destination);
+ waitForExtractionComplete(destination);
+ } catch (AtomicMoveNotSupportedException e) {
+ // Filesystem doesn't support atomic move, try regular move
try {
- fileChannel.close();
- } catch (IOException e) {
- LOG.warn("failed to close the file channel");
- exceptions.add(e);
+ Files.move(tempDir, destination);
+ LOG.debug("successfully extracted {} (non-atomic)",
destination);
+ } catch (FileAlreadyExistsException |
DirectoryNotEmptyException e2) {
+ // Another process extracted it first - wait for
completion marker
+ LOG.debug("plugin already extracted by another process:
{}", destination);
+ waitForExtractionComplete(destination);
}
}
- boolean isDeleted = lockFile.delete();
- if (! isDeleted) {
- LOG.warn("failed to delete the lock file");
- exceptions.add(new IOException("failed to delete lock file: "
+ lockFile));
+ } finally {
+ // Clean up temp dir if it still exists (we lost the race or there
was an error)
+ if (Files.exists(tempDir)) {
+ deleteRecursively(tempDir);
}
}
- if (! exceptions.isEmpty()) {
- throw exceptions.get(0);
- }
}
- private static void extract(Path source) throws IOException {
- Path destination = getDestination(source);
- Unzip unzip = new Unzip(source.toFile(), destination.toFile());
- unzip.extract();
+ /**
+ * Checks if extraction is complete by verifying both directory exists and
completion marker is present.
+ */
+ private static boolean isExtractionComplete(Path destination) {
+ return Files.isDirectory(destination) &&
Files.exists(destination.resolve(COMPLETE_MARKER));
}
- private static boolean isExtracted(Path source) {
- Path destination = getDestination(source);
- return Files.isDirectory(destination);
+ /**
+ * Waits for extraction to complete by polling for the completion marker.
+ * This is called when we detect another process is extracting.
+ */
+ private static void waitForExtractionComplete(Path destination) throws
IOException {
+ long maxWaitMs = 60000; // 1 minute max wait
+ long pollIntervalMs = 100;
+ long waited = 0;
+
+ while (waited < maxWaitMs) {
+ if (isExtractionComplete(destination)) {
+ LOG.debug("extraction completed by another process: {}",
destination);
+ return;
+ }
+ try {
+ Thread.sleep(pollIntervalMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupted while waiting for
extraction to complete", e);
+ }
+ waited += pollIntervalMs;
+ }
+
+ throw new IOException("timed out waiting for extraction to complete: "
+ destination);
}
private static Path getDestination(Path source) {
@@ -97,4 +147,19 @@ public class ThreadSafeUnzipper {
fName = fName.substring(0, fName.length() - 4);
return source.toAbsolutePath().getParent().resolve(fName);
}
+
+ private static void deleteRecursively(Path path) {
+ try (Stream<Path> walk = Files.walk(path)) {
+ walk.sorted(Comparator.reverseOrder())
+ .forEach(p -> {
+ try {
+ Files.delete(p);
+ } catch (IOException e) {
+ LOG.warn("failed to delete temp file: {}", p, e);
+ }
+ });
+ } catch (IOException e) {
+ LOG.warn("failed to clean up temp directory: {}", path, e);
+ }
+ }
}
diff --git
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
index e030b8a67..cd6296755 100644
---
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
+++
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
@@ -53,6 +53,54 @@ public class TikaPluginManager extends DefaultPluginManager {
OBJECT_MAPPER.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
true);
}
+ /**
+ * Pre-extracts plugin zip files without loading them.
+ * <p>
+ * Call this method early in parent processes (e.g., AsyncProcessor,
PipesParser)
+ * before spawning child processes. This ensures plugins are extracted
once in
+ * the parent, so child processes don't race to extract the same plugins.
+ * <p>
+ * This method is synchronized to prevent concurrent extraction within the
same JVM.
+ * For cross-process safety, {@link ThreadSafeUnzipper} uses atomic rename.
+ * <p>
+ * If plugin-roots is not specified in the config, this method does
nothing.
+ *
+ * @param tikaJsonConfig the configuration containing plugin-roots
+ * @throws IOException if extraction fails
+ */
+ public static synchronized void preExtractPlugins(TikaJsonConfig
tikaJsonConfig)
+ throws IOException {
+ JsonNode root = tikaJsonConfig.getRootNode();
+ JsonNode pluginRoots = root.get("plugin-roots");
+ if (pluginRoots == null) {
+ // No plugins configured - nothing to extract
+ return;
+ }
+ List<Path> roots = OBJECT_MAPPER.convertValue(pluginRoots,
+ new TypeReference<List<Path>>() {});
+ for (Path pluginRoot : roots) {
+ extractPluginsInDirectory(pluginRoot);
+ }
+ }
+
+ private static void extractPluginsInDirectory(Path root) throws
IOException {
+ if (!Files.isDirectory(root)) {
+ return;
+ }
+ long start = System.currentTimeMillis();
+ File[] files = root.toFile().listFiles();
+ if (files == null) {
+ return;
+ }
+ for (File f : files) {
+ if (f.getName().endsWith(".zip")) {
+ ThreadSafeUnzipper.unzipPlugin(f.toPath());
+ }
+ }
+ LOG.debug("took {} ms to pre-extract plugins in {}",
+ System.currentTimeMillis() - start, root);
+ }
+
/**
* Loads plugin manager from a pre-parsed TikaJsonConfig.
* This is the preferred method when sharing configuration across
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 952e3b376..908fdf867 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -62,7 +62,7 @@ public class AsyncResource {
private ArrayBlockingQueue<FetchEmitTuple> queue;
public AsyncResource(java.nio.file.Path tikaConfigPath) throws
TikaException, IOException, SAXException {
- this.asyncProcessor = new AsyncProcessor(tikaConfigPath);
+ this.asyncProcessor = AsyncProcessor.load(tikaConfigPath);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
TikaPluginManager pluginManager =
TikaPluginManager.load(tikaJsonConfig);
this.emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig);
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index a022af726..128957d86 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -65,7 +65,7 @@ public class PipesResource {
pipesConfig.setEmitStrategy(new
EmitStrategyConfig(EmitStrategy.EMIT_ALL));
}
}
- this.pipesParser = new PipesParser(pipesConfig, tikaConfig);
+ this.pipesParser = PipesParser.load(tikaJsonConfig, pipesConfig,
tikaConfig);
}