This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new e113b93e3 TIKA-4575 -- fix race condition with unzipping plugins 
(#2458)
e113b93e3 is described below

commit e113b93e397bae5bd1205397147ef2147ef84cf7
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 16 14:16:31 2025 -0500

    TIKA-4575 -- fix race condition with unzipping plugins (#2458)
    
    Generated-by: Claude Opus 4.5 (model ID: claude-opus-4-5-20251101)
---
 .../org/apache/tika/async/cli/TikaAsyncCLI.java    |   2 +-
 .../apache/tika/async/cli/AsyncProcessorTest.java  |   4 +-
 .../org/apache/tika/pipes/core/PipesParser.java    |  46 +++++-
 .../tika/pipes/core/async/AsyncProcessor.java      |  36 ++++-
 .../apache/tika/pipes/fork/PipesForkParser.java    |   9 +-
 .../pipes/core/async/AsyncChaosMonkeyTest.java     |   4 +-
 .../apache/tika/plugins/ThreadSafeUnzipper.java    | 167 ++++++++++++++-------
 .../org/apache/tika/plugins/TikaPluginManager.java |  48 ++++++
 .../tika/server/core/resource/AsyncResource.java   |   2 +-
 .../tika/server/core/resource/PipesResource.java   |   2 +-
 10 files changed, 253 insertions(+), 67 deletions(-)

diff --git 
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
 
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
index 3a9c5cec3..6576c904e 100644
--- 
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
+++ 
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
@@ -259,7 +259,7 @@ public class TikaAsyncCLI {
 
     private static void processWithTikaConfig(PipesIterator pipesIterator, 
Path tikaConfigPath, SimpleAsyncConfig asyncConfig) throws Exception {
         long start = System.currentTimeMillis();
-        try (AsyncProcessor processor = new AsyncProcessor(tikaConfigPath, 
pipesIterator)) {
+        try (AsyncProcessor processor = AsyncProcessor.load(tikaConfigPath, 
pipesIterator)) {
 
             for (FetchEmitTuple t : pipesIterator) {
                 configureExtractBytes(t, asyncConfig);
diff --git 
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
 
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
index 29bf9ebf8..4bd181699 100644
--- 
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
+++ 
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
@@ -114,7 +114,7 @@ public class AsyncProcessorTest extends TikaTest {
     public void testRecursiveUnpacking() throws Exception {
 //        TikaAsyncCLI cli = new TikaAsyncCLI();
         //      cli.main(new String[]{ 
configDir.resolve("tika-config.xml").toAbsolutePath().toString()});
-        AsyncProcessor processor = new 
AsyncProcessor(configDir.resolve("tika-config.json"));
+        AsyncProcessor processor = 
AsyncProcessor.load(configDir.resolve("tika-config.json"));
 
         EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = new 
EmbeddedDocumentBytesConfig(true);
         embeddedDocumentBytesConfig.setIncludeOriginal(true);
@@ -163,7 +163,7 @@ public class AsyncProcessorTest extends TikaTest {
     public void testStopsOnApplicationError() throws Exception {
         // Test that AsyncProcessor stops processing when an application error 
occurs
         // (TIKA-4570)
-        AsyncProcessor processor = new 
AsyncProcessor(configDir.resolve("tika-config.json"));
+        AsyncProcessor processor = 
AsyncProcessor.load(configDir.resolve("tika-config.json"));
 
         // Create a tuple with a non-existent fetcher - this will cause 
FETCHER_NOT_FOUND
         // which is a TASK_EXCEPTION but will stop processing in CLI mode 
(default)
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
index 42bf6b596..bfa4a61df 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
@@ -24,19 +24,59 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.pipes.api.FetchEmitTuple;
 import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.plugins.TikaPluginManager;
 
 public class PipesParser implements Closeable {
 
+    /**
+     * Loads a PipesParser from a configuration file path.
+     * <p>
+     * This method:
+     * <ol>
+     *   <li>Loads the JSON configuration</li>
+     *   <li>Pre-extracts plugins before spawning child processes</li>
+     *   <li>Creates the PipesParser with the loaded configuration</li>
+     * </ol>
+     *
+     * @param tikaConfigPath path to the tika-config.json file
+     * @return a new PipesParser instance
+     * @throws IOException if reading config or extraction fails
+     * @throws TikaConfigException if configuration is invalid
+     */
+    public static PipesParser load(Path tikaConfigPath) throws IOException, 
TikaConfigException {
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+        PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
+        return load(tikaJsonConfig, pipesConfig, tikaConfigPath);
+    }
+
+    /**
+     * Loads a PipesParser from pre-loaded configuration objects.
+     * <p>
+     * Use this method when you need to modify the PipesConfig before creating
+     * the parser (e.g., to override emit strategy).
+     *
+     * @param tikaJsonConfig the pre-loaded JSON configuration
+     * @param pipesConfig the pipes configuration (may be modified by caller)
+     * @param tikaConfigPath path to the config file (passed to child 
processes)
+     * @return a new PipesParser instance
+     * @throws IOException if plugin extraction fails
+     */
+    public static PipesParser load(TikaJsonConfig tikaJsonConfig, PipesConfig 
pipesConfig,
+            Path tikaConfigPath) throws IOException {
+        TikaPluginManager.preExtractPlugins(tikaJsonConfig);
+        return new PipesParser(pipesConfig, tikaConfigPath);
+    }
 
     private final PipesConfig pipesConfig;
     private final Path tikaConfigPath;
     private final List<PipesClient> clients = new ArrayList<>();
-    private final ArrayBlockingQueue<PipesClient> clientQueue ;
-
+    private final ArrayBlockingQueue<PipesClient> clientQueue;
 
-    public PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
+    private PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
         this.pipesConfig = pipesConfig;
         this.tikaConfigPath = tikaConfigPath;
         this.clientQueue = new 
ArrayBlockingQueue<>(pipesConfig.getNumClients());
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index f741ee45f..78ecc70d7 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -79,12 +79,42 @@ public class AsyncProcessor implements Closeable {
     private boolean addedEmitterSemaphores = false;
     boolean isShuttingDown = false;
 
-    public AsyncProcessor(Path tikaConfigPath) throws TikaException, 
IOException {
-        this(tikaConfigPath, null);
+    /**
+     * Loads an AsyncProcessor from a configuration file path.
+     * <p>
+     * This method pre-extracts plugins before loading, ensuring child 
processes
+     * don't race to extract the same plugins.
+     *
+     * @param tikaConfigPath path to the tika-config.json file
+     * @return a new AsyncProcessor instance
+     * @throws IOException if reading config or plugin extraction fails
+     * @throws TikaException if configuration is invalid
+     */
+    public static AsyncProcessor load(Path tikaConfigPath) throws 
TikaException, IOException {
+        return load(tikaConfigPath, null);
     }
 
-    public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) 
throws TikaException, IOException {
+    /**
+     * Loads an AsyncProcessor from a configuration file path with a custom 
PipesIterator.
+     * <p>
+     * This method pre-extracts plugins before loading, ensuring child 
processes
+     * don't race to extract the same plugins.
+     *
+     * @param tikaConfigPath path to the tika-config.json file
+     * @param pipesIterator optional custom pipes iterator (may be null)
+     * @return a new AsyncProcessor instance
+     * @throws IOException if reading config or plugin extraction fails
+     * @throws TikaException if configuration is invalid
+     */
+    public static AsyncProcessor load(Path tikaConfigPath, PipesIterator 
pipesIterator)
+            throws TikaException, IOException {
         TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+        TikaPluginManager.preExtractPlugins(tikaJsonConfig);
+        return new AsyncProcessor(tikaConfigPath, pipesIterator, 
tikaJsonConfig);
+    }
+
+    private AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator,
+            TikaJsonConfig tikaJsonConfig) throws TikaException, IOException {
         TikaPluginManager tikaPluginManager = 
TikaPluginManager.load(tikaJsonConfig);
         MetadataFilter metadataFilter = 
TikaLoader.load(tikaConfigPath).loadMetadataFilters();
         this.asyncConfig = PipesConfig.load(tikaJsonConfig);
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
index 01296a92a..0420596d5 100644
--- 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkParser.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
@@ -119,8 +120,9 @@ public class PipesForkParser implements Closeable {
      * Creates a new PipesForkParser with default configuration.
      *
      * @throws IOException if the temporary config file cannot be created
+     * @throws TikaConfigException if configuration is invalid
      */
-    public PipesForkParser() throws IOException {
+    public PipesForkParser() throws IOException, TikaConfigException {
         this(new PipesForkParserConfig());
     }
 
@@ -129,11 +131,12 @@ public class PipesForkParser implements Closeable {
      *
      * @param config the configuration for this parser
      * @throws IOException if the temporary config file cannot be created
+     * @throws TikaConfigException if configuration is invalid
      */
-    public PipesForkParser(PipesForkParserConfig config) throws IOException {
+    public PipesForkParser(PipesForkParserConfig config) throws IOException, 
TikaConfigException {
         this.config = config;
         this.tikaConfigPath = createTikaConfigFile();
-        this.pipesParser = new PipesParser(config.getPipesConfig(), 
tikaConfigPath);
+        this.pipesParser = PipesParser.load(tikaConfigPath);
     }
 
     /**
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
index 54fb0bba4..f195e3222 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
@@ -132,7 +132,7 @@ public class AsyncChaosMonkeyTest {
 
     @Test
     public void testBasic(@TempDir Path tmpDir) throws Exception {
-        AsyncProcessor processor = new AsyncProcessor(setUp(tmpDir, false));
+        AsyncProcessor processor = AsyncProcessor.load(setUp(tmpDir, false));
         for (int i = 0; i < totalFiles; i++) {
             FetchEmitTuple t = new FetchEmitTuple("myId-" + i,
                     new FetchKey(fetcherPluginId, i + ".xml"),
@@ -164,7 +164,7 @@ public class AsyncChaosMonkeyTest {
 
     @Test
     public void testEmitIntermediate(@TempDir Path tmpDir) throws Exception {
-        AsyncProcessor processor = new AsyncProcessor(setUp(tmpDir, true));
+        AsyncProcessor processor = AsyncProcessor.load(setUp(tmpDir, true));
         for (int i = 0; i < totalFiles; i++) {
             FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new 
FetchKey(fetcherPluginId, i + ".xml"),
                     new EmitKey(emitterPluginId, "emit-" + i), new Metadata());
diff --git 
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
 
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
index 7b04624a0..7287a4cbd 100644
--- 
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
+++ 
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/ThreadSafeUnzipper.java
@@ -16,80 +16,130 @@
  */
 package org.apache.tika.plugins;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.file.StandardCopyOption;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.stream.Stream;
 
 import org.pf4j.util.Unzip;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Thread-safe and process-safe plugin unzipper using atomic rename.
+ * <p>
+ * This avoids file locking issues on Windows by using a simple strategy:
+ * <ol>
+ *   <li>Check if destination directory exists with completion marker - if 
yes, already extracted</li>
+ *   <li>Extract to a temporary directory with a unique name</li>
+ *   <li>Create a completion marker file in the temp directory</li>
+ *   <li>Atomically rename temp dir to final destination</li>
+ *   <li>If rename fails (another process won), clean up temp dir</li>
+ * </ol>
+ * <p>
+ * The completion marker ensures that even if atomic move is not supported,
+ * other processes won't attempt to load a partially-moved directory.
+ */
 public class ThreadSafeUnzipper {
     private static final Logger LOG = 
LoggerFactory.getLogger(TikaPluginManager.class);
+    private static final String COMPLETE_MARKER = ".tika-extraction-complete";
 
-    private static final long MAX_WAIT_MS = 60000;
-
-    public static synchronized void unzipPlugin(Path source) throws 
IOException {
-        if (! source.getFileName().toString().endsWith(".zip")) {
+    /**
+     * Unzips a plugin zip file to a directory with the same name (minus .zip 
extension).
+     * Safe for concurrent calls from multiple threads or processes. See
+     * documentation at the head of this class for how it works.
+     *
+     * @param source path to the .zip file
+     * @throws IOException if extraction fails
+     */
+    public static void unzipPlugin(Path source) throws IOException {
+        if (!source.getFileName().toString().endsWith(".zip")) {
             throw new IllegalArgumentException("source file name must end in 
'.zip'");
         }
-        File lockFile = new File(source.toAbsolutePath() + ".lock");
-        FileChannel fileChannel = null;
-        FileLock fileLock = null;
-        List<IOException> exceptions = new ArrayList<>();
+
+        Path destination = getDestination(source);
+
+        // Already extracted - check for both directory AND completion marker
+        if (isExtractionComplete(destination)) {
+            LOG.debug("{} is already extracted", source);
+            return;
+        }
+
+        // Extract to a unique temp directory
+        Path tempDir = destination.resolveSibling(
+                destination.getFileName() + ".tmp." + UUID.randomUUID());
+
         try {
-            fileChannel = new RandomAccessFile(lockFile, "rw").getChannel();
-            LOG.debug("acquiring lock");
-            fileLock = fileChannel.lock();
-            LOG.debug("acquired lock");
-            if (isExtracted(source)) {
-                LOG.debug("{} is already extracted", source);
-                return;
-            }
-            extract(source);
-        } finally {
-            if (fileLock != null && fileLock.isValid()) {
-                try {
-                    fileLock.release();
-                } catch (IOException e) {
-                    LOG.warn("failed to release the lock");
-                    exceptions.add(e);
-                }
-            }
-            if (fileChannel != null) {
+            LOG.debug("extracting {} to temp dir {}", source, tempDir);
+            new Unzip(source.toFile(), tempDir.toFile()).extract();
+
+            // Create completion marker in temp dir before moving
+            Files.createFile(tempDir.resolve(COMPLETE_MARKER));
+
+            // Atomically rename to final destination
+            try {
+                Files.move(tempDir, destination, 
StandardCopyOption.ATOMIC_MOVE);
+                LOG.debug("successfully extracted {}", destination);
+            } catch (FileAlreadyExistsException | DirectoryNotEmptyException 
e) {
+                // Another process extracted it first - wait for completion 
marker
+                LOG.debug("plugin already extracted by another process: {}", 
destination);
+                waitForExtractionComplete(destination);
+            } catch (AtomicMoveNotSupportedException e) {
+                // Filesystem doesn't support atomic move, try regular move
                 try {
-                    fileChannel.close();
-                } catch (IOException e) {
-                    LOG.warn("failed to close the file channel");
-                    exceptions.add(e);
+                    Files.move(tempDir, destination);
+                    LOG.debug("successfully extracted {} (non-atomic)", 
destination);
+                } catch (FileAlreadyExistsException | 
DirectoryNotEmptyException e2) {
+                    // Another process extracted it first - wait for 
completion marker
+                    LOG.debug("plugin already extracted by another process: 
{}", destination);
+                    waitForExtractionComplete(destination);
                 }
             }
-            boolean isDeleted = lockFile.delete();
-            if (! isDeleted) {
-                LOG.warn("failed to delete the lock file");
-                exceptions.add(new IOException("failed to delete lock file: " 
+ lockFile));
+        } finally {
+            // Clean up temp dir if it still exists (we lost the race or there 
was an error)
+            if (Files.exists(tempDir)) {
+                deleteRecursively(tempDir);
             }
         }
-        if (! exceptions.isEmpty()) {
-            throw exceptions.get(0);
-        }
     }
 
-    private static void extract(Path source) throws IOException {
-        Path destination = getDestination(source);
-        Unzip unzip = new Unzip(source.toFile(), destination.toFile());
-        unzip.extract();
+    /**
+     * Checks if extraction is complete by verifying both directory exists and 
completion marker is present.
+     */
+    private static boolean isExtractionComplete(Path destination) {
+        return Files.isDirectory(destination) && 
Files.exists(destination.resolve(COMPLETE_MARKER));
     }
 
-    private static boolean isExtracted(Path source) {
-        Path destination = getDestination(source);
-        return Files.isDirectory(destination);
+    /**
+     * Waits for extraction to complete by polling for the completion marker.
+     * This is called when we detect another process is extracting.
+     */
+    private static void waitForExtractionComplete(Path destination) throws 
IOException {
+        long maxWaitMs = 60000; // 1 minute max wait
+        long pollIntervalMs = 100;
+        long waited = 0;
+
+        while (waited < maxWaitMs) {
+            if (isExtractionComplete(destination)) {
+                LOG.debug("extraction completed by another process: {}", 
destination);
+                return;
+            }
+            try {
+                Thread.sleep(pollIntervalMs);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("interrupted while waiting for 
extraction to complete", e);
+            }
+            waited += pollIntervalMs;
+        }
+
+        throw new IOException("timed out waiting for extraction to complete: " 
+ destination);
     }
 
     private static Path getDestination(Path source) {
@@ -97,4 +147,19 @@ public class ThreadSafeUnzipper {
         fName = fName.substring(0, fName.length() - 4);
         return source.toAbsolutePath().getParent().resolve(fName);
     }
+
+    private static void deleteRecursively(Path path) {
+        try (Stream<Path> walk = Files.walk(path)) {
+            walk.sorted(Comparator.reverseOrder())
+                    .forEach(p -> {
+                        try {
+                            Files.delete(p);
+                        } catch (IOException e) {
+                            LOG.warn("failed to delete temp file: {}", p, e);
+                        }
+                    });
+        } catch (IOException e) {
+            LOG.warn("failed to clean up temp directory: {}", path, e);
+        }
+    }
 }
diff --git 
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
 
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
index e030b8a67..cd6296755 100644
--- 
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
+++ 
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginManager.java
@@ -53,6 +53,54 @@ public class TikaPluginManager extends DefaultPluginManager {
         
OBJECT_MAPPER.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
true);
     }
 
+    /**
+     * Pre-extracts plugin zip files without loading them.
+     * <p>
+     * Call this method early in parent processes (e.g., AsyncProcessor, 
PipesParser)
+     * before spawning child processes. This ensures plugins are extracted 
once in
+     * the parent, so child processes don't race to extract the same plugins.
+     * <p>
+     * This method is synchronized to prevent concurrent extraction within the 
same JVM.
+     * For cross-process safety, {@link ThreadSafeUnzipper} uses atomic rename.
+     * <p>
+     * If plugin-roots is not specified in the config, this method does 
nothing.
+     *
+     * @param tikaJsonConfig the configuration containing plugin-roots
+     * @throws IOException if extraction fails
+     */
+    public static synchronized void preExtractPlugins(TikaJsonConfig 
tikaJsonConfig)
+            throws IOException {
+        JsonNode root = tikaJsonConfig.getRootNode();
+        JsonNode pluginRoots = root.get("plugin-roots");
+        if (pluginRoots == null) {
+            // No plugins configured - nothing to extract
+            return;
+        }
+        List<Path> roots = OBJECT_MAPPER.convertValue(pluginRoots,
+                new TypeReference<List<Path>>() {});
+        for (Path pluginRoot : roots) {
+            extractPluginsInDirectory(pluginRoot);
+        }
+    }
+
+    private static void extractPluginsInDirectory(Path root) throws 
IOException {
+        if (!Files.isDirectory(root)) {
+            return;
+        }
+        long start = System.currentTimeMillis();
+        File[] files = root.toFile().listFiles();
+        if (files == null) {
+            return;
+        }
+        for (File f : files) {
+            if (f.getName().endsWith(".zip")) {
+                ThreadSafeUnzipper.unzipPlugin(f.toPath());
+            }
+        }
+        LOG.debug("took {} ms to pre-extract plugins in {}",
+                System.currentTimeMillis() - start, root);
+    }
+
     /**
      * Loads plugin manager from a pre-parsed TikaJsonConfig.
      * This is the preferred method when sharing configuration across
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 952e3b376..908fdf867 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -62,7 +62,7 @@ public class AsyncResource {
     private ArrayBlockingQueue<FetchEmitTuple> queue;
 
     public AsyncResource(java.nio.file.Path tikaConfigPath) throws 
TikaException, IOException, SAXException {
-        this.asyncProcessor = new AsyncProcessor(tikaConfigPath);
+        this.asyncProcessor = AsyncProcessor.load(tikaConfigPath);
         TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
         TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaJsonConfig);
         this.emitterManager = EmitterManager.load(pluginManager, 
tikaJsonConfig);
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index a022af726..128957d86 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -65,7 +65,7 @@ public class PipesResource {
                 pipesConfig.setEmitStrategy(new 
EmitStrategyConfig(EmitStrategy.EMIT_ALL));
             }
         }
-        this.pipesParser = new PipesParser(pipesConfig, tikaConfig);
+        this.pipesParser = PipesParser.load(tikaJsonConfig, pipesConfig, 
tikaConfig);
     }
 
 

Reply via email to