This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new a756502b4 TIKA-4570 (#2457)
a756502b4 is described below

commit a756502b4c984d815088fda41d2a61ceacd3d4ad
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 16 11:09:53 2025 -0500

    TIKA-4570 (#2457)
    
    * TIKA-4570 -- first step
    
    * TIKA-4570 -- improve error codes and handling in AsyncProcessor
---
 .../apache/tika/async/cli/AsyncProcessorTest.java  |  52 ++++++
 .../org/apache/tika/pipes/api/PipesResult.java     | 181 +++++++++++++--------
 .../org/apache/tika/pipes/core/PipesConfig.java    |  31 ++++
 .../org/apache/tika/pipes/core/PipesException.java |   8 +
 .../org/apache/tika/pipes/core/PipesResults.java   |   1 -
 .../tika/pipes/core/async/AsyncProcessor.java      |  87 +++++++++-
 .../apache/tika/pipes/fork/PipesForkResult.java    |  26 ++-
 .../tika/pipes/fork/PipesForkParserTest.java       |   7 +-
 .../apache/tika/pipes/core/PipesClientTest.java    |  34 ++--
 .../tika/server/core/resource/PipesResource.java   |   3 +-
 10 files changed, 337 insertions(+), 93 deletions(-)

diff --git 
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
 
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
index 51aff03a0..29bf9ebf8 100644
--- 
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
+++ 
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
@@ -19,6 +19,8 @@ package org.apache.tika.async.cli;
 
 import static 
org.apache.tika.pipes.api.pipesiterator.PipesIteratorBaseConfig.DEFAULT_HANDLER_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.BufferedReader;
 import java.io.OutputStream;
@@ -46,6 +48,7 @@ import org.apache.tika.pipes.api.HandlerConfig;
 import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.api.fetcher.FetchKey;
 import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
+import org.apache.tika.pipes.core.PipesException;
 import org.apache.tika.pipes.core.async.AsyncProcessor;
 import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.serialization.JsonMetadataList;
@@ -155,4 +158,53 @@ public class AsyncProcessorTest extends TikaTest {
                 .get(1)
                 .get(TikaCoreProperties.TIKA_CONTENT));
     }
+
+    @Test
+    public void testStopsOnApplicationError() throws Exception {
+        // Test that AsyncProcessor stops processing when an application error 
occurs
+        // (TIKA-4570)
+        AsyncProcessor processor = new 
AsyncProcessor(configDir.resolve("tika-config.json"));
+
+        // Create a tuple with a non-existent fetcher - this will cause 
FETCHER_NOT_FOUND
+        // which is a TASK_EXCEPTION but will stop processing in CLI mode 
(default)
+        ParseContext parseContext = new ParseContext();
+        parseContext.set(HandlerConfig.class, DEFAULT_HANDLER_CONFIG);
+        FetchEmitTuple badTuple = new FetchEmitTuple(
+                "bad-tuple-1",
+                new FetchKey("non-existent-fetcher", "some-file.txt"),
+                new EmitKey("fse-json", "emit-bad"),
+                new Metadata(),
+                parseContext,
+                FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+
+        // Offer the bad tuple
+        processor.offer(badTuple, 1000);
+
+        // Wait for the error to be detected
+        int maxWaitMs = 30000;
+        int waited = 0;
+        while (!processor.hasApplicationError() && waited < maxWaitMs) {
+            Thread.sleep(100);
+            waited += 100;
+        }
+
+        // Verify that the application error was detected
+        assertTrue(processor.hasApplicationError(),
+                "AsyncProcessor should detect application error from bad 
fetcher");
+
+        // Verify that subsequent offers throw PipesException
+        FetchEmitTuple anotherTuple = new FetchEmitTuple(
+                "another-tuple",
+                new FetchKey("fsf", "mock.xml"),
+                new EmitKey("fse-json", "emit-another"),
+                new Metadata(),
+                parseContext,
+                FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+
+        assertThrows(PipesException.class, () -> {
+            processor.offer(anotherTuple, 1000);
+        }, "Should throw PipesException when offering after application 
error");
+
+        processor.close();
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
index 9418bac0b..482e13e7f 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
@@ -25,65 +25,61 @@ public record PipesResult(RESULT_STATUS status, EmitData 
emitData, String messag
     /**
      * High-level categorization of result statuses.
      * <p>
-     * Categories help distinguish between:
+     * Categories help distinguish between different types of failures and 
successes,
+     * allowing infrastructure to decide how to handle each case:
      * <ul>
-     *   <li>Process crashes (OOM, timeout, system-level failures)</li>
-     *   <li>Application errors (process ran but encountered errors)</li>
-     *   <li>Successful processing (possibly with warnings)</li>
+     *   <li>{@link #FATAL} - System cannot continue, must be fixed and 
restarted</li>
+     *   <li>{@link #INITIALIZATION_FAILURE} - Component failed to initialize, 
might be transient</li>
+     *   <li>{@link #TASK_EXCEPTION} - This task failed, but other tasks may 
succeed</li>
+     *   <li>{@link #PROCESS_CRASH} - Forked process crashed, auto-restart and 
continue</li>
+     *   <li>{@link #SUCCESS} - Processing completed successfully (possibly 
with warnings)</li>
      * </ul>
      */
     public enum CATEGORY {
-        /** Forked process crashed due to OOM, timeout, or other system 
failure */
-        PROCESS_CRASH,
+        /** Fatal system error - cannot continue, must be fixed and restarted 
*/
+        FATAL,
+
+        /** Component initialization failed - processing should stop, might be 
transient */
+        INITIALIZATION_FAILURE,
 
-        /** Process completed but encountered application-level errors */
-        APPLICATION_ERROR,
+        /** Task-level exception - this task failed, log and continue with 
next task */
+        TASK_EXCEPTION,
+
+        /** Forked process crashed due to OOM, timeout, or other system 
failure - auto-restart */
+        PROCESS_CRASH,
 
         /** Processing completed successfully (possibly with warnings) */
         SUCCESS
     }
 
     public enum RESULT_STATUS {
-        // Process crashes
-        OOM(CATEGORY.PROCESS_CRASH),
-        TIMEOUT(CATEGORY.PROCESS_CRASH),
-        UNSPECIFIED_CRASH(CATEGORY.PROCESS_CRASH),
+        // Fatal errors - system cannot continue
+        FAILED_TO_INITIALIZE(CATEGORY.FATAL),
 
-        // Initialization failures
-        FAILED_TO_INITIALIZE(CATEGORY.APPLICATION_ERROR),
+        // Initialization failures - component failed to start, might be 
transient
+        FETCHER_INITIALIZATION_EXCEPTION(CATEGORY.INITIALIZATION_FAILURE),
+        EMITTER_INITIALIZATION_EXCEPTION(CATEGORY.INITIALIZATION_FAILURE),
+        CLIENT_UNAVAILABLE_WITHIN_MS(CATEGORY.INITIALIZATION_FAILURE),
 
-        // Process crashes (system-level failures)
-        CLIENT_UNAVAILABLE_WITHIN_MS(CATEGORY.APPLICATION_ERROR),
+        // Task exceptions - this task failed, continue with next
+        FETCH_EXCEPTION(CATEGORY.TASK_EXCEPTION),
+        EMIT_EXCEPTION(CATEGORY.TASK_EXCEPTION),
+        FETCHER_NOT_FOUND(CATEGORY.TASK_EXCEPTION),
+        EMITTER_NOT_FOUND(CATEGORY.TASK_EXCEPTION),
 
-        // Fetch failures
-        FETCHER_INITIALIZATION_EXCEPTION(CATEGORY.APPLICATION_ERROR),
-        FETCH_EXCEPTION(CATEGORY.APPLICATION_ERROR),
+        // Process crashes - forked process died, auto-restart
+        OOM(CATEGORY.PROCESS_CRASH),
+        TIMEOUT(CATEGORY.PROCESS_CRASH),
+        UNSPECIFIED_CRASH(CATEGORY.PROCESS_CRASH),
 
-        // Success with edge case
+        // Success states
         EMPTY_OUTPUT(CATEGORY.SUCCESS),
-
-
-        // Parse success states
         PARSE_SUCCESS(CATEGORY.SUCCESS),
         PARSE_SUCCESS_WITH_EXCEPTION(CATEGORY.SUCCESS),
         PARSE_EXCEPTION_NO_EMIT(CATEGORY.SUCCESS),
-
-
-        // Emit success states
         EMIT_SUCCESS(CATEGORY.SUCCESS),
         EMIT_SUCCESS_PARSE_EXCEPTION(CATEGORY.SUCCESS),
-        EMIT_SUCCESS_PASSBACK(CATEGORY.SUCCESS),
-
-        // Emit failure
-        EMIT_EXCEPTION(CATEGORY.APPLICATION_ERROR),
-
-        // Emitter failures
-        EMITTER_INITIALIZATION_EXCEPTION(CATEGORY.APPLICATION_ERROR),
-        EMITTER_NOT_FOUND(CATEGORY.APPLICATION_ERROR),
-
-        // Other errors
-        INTERRUPTED_EXCEPTION(CATEGORY.APPLICATION_ERROR),
-        FETCHER_NOT_FOUND(CATEGORY.APPLICATION_ERROR);
+        EMIT_SUCCESS_PASSBACK(CATEGORY.SUCCESS);
 
 
         private final CATEGORY category;
@@ -95,34 +91,66 @@ public record PipesResult(RESULT_STATUS status, EmitData 
emitData, String messag
         /**
          * Gets the high-level category for this result status.
          *
-         * @return the category (PROCESS_CRASH, APPLICATION_ERROR, or SUCCESS)
+         * @return the category (FATAL, INITIALIZATION_FAILURE, 
TASK_EXCEPTION, PROCESS_CRASH, or SUCCESS)
          */
         public CATEGORY getCategory() {
             return category;
         }
 
         /**
-         * Checks if this status represents a process crash (OOM, timeout, 
etc.).
+         * Checks if this status represents a fatal error.
+         * <p>
+         * Fatal errors mean the system cannot continue and must be fixed and 
restarted.
          *
-         * @return true if the forked process crashed
+         * @return true if this is a fatal error
          */
-        public boolean isProcessCrash() {
-            return category == CATEGORY.PROCESS_CRASH;
+        public boolean isFatal() {
+            return category == CATEGORY.FATAL;
+        }
+
+        /**
+         * Checks if this status represents an initialization failure.
+         * <p>
+         * Initialization failures occur when a component (fetcher, emitter, 
client)
+         * fails to start. These might be transient (e.g., network issues) or 
require
+         * configuration fixes.
+         *
+         * @return true if a component failed to initialize
+         */
+        public boolean isInitializationFailure() {
+            return category == CATEGORY.INITIALIZATION_FAILURE;
         }
 
         /**
-         * Checks if this status represents an application error.
+         * Checks if this status represents a task-level exception.
+         * <p>
+         * Task exceptions indicate this specific task failed, but other tasks
+         * may succeed. Processing can continue with the next task.
          *
-         * @return true if the process ran but encountered an error
+         * @return true if this task failed
          */
-        public boolean isApplicationError() {
-            return category == CATEGORY.APPLICATION_ERROR;
+        public boolean isTaskException() {
+            return category == CATEGORY.TASK_EXCEPTION;
         }
 
         /**
-         * Checks if this status represents successful processing. Successful
-         * processing includes handling standard runtime exceptions during the
-         * parse.
+         * Checks if this status represents a process crash (OOM, timeout, 
etc.).
+         * <p>
+         * Process crashes are system-level failures where the forked process
+         * terminated abnormally. The process will be auto-restarted and
+         * processing can continue.
+         *
+         * @return true if the forked process crashed
+         */
+        public boolean isProcessCrash() {
+            return category == CATEGORY.PROCESS_CRASH;
+        }
+
+        /**
+         * Checks if this status represents successful processing.
+         * <p>
+         * Success includes normal completion as well as cases where
+         * processing completed with warnings (e.g., 
PARSE_SUCCESS_WITH_EXCEPTION).
          *
          * @return true if processing completed successfully (possibly with 
warnings)
          */
@@ -161,36 +189,59 @@ public record PipesResult(RESULT_STATUS status, EmitData 
emitData, String messag
     /**
      * Gets the high-level category for this result.
      *
-     * @return the category (PROCESS_CRASH, APPLICATION_ERROR, or SUCCESS)
+     * @return the category (FATAL, INITIALIZATION_FAILURE, TASK_EXCEPTION, 
PROCESS_CRASH, or SUCCESS)
      */
     public CATEGORY getCategory() {
         return status.getCategory();
     }
 
     /**
-     * Checks if this result represents a process crash (OOM, timeout, etc.).
+     * Checks if this result represents a fatal error.
      * <p>
-     * Process crashes are system-level failures where the forked process
-     * terminated abnormally, as opposed to application errors where the
-     * process completed but encountered errors during execution.
+     * Fatal errors mean the system cannot continue and must be fixed and 
restarted.
      *
-     * @return true if the forked process crashed
+     * @return true if this is a fatal error
      */
-    public boolean isProcessCrash() {
-        return status.isProcessCrash();
+    public boolean isFatal() {
+        return status.isFatal();
+    }
+
+    /**
+     * Checks if this result represents an initialization failure.
+     * <p>
+     * Initialization failures occur when a component (fetcher, emitter, 
client)
+     * fails to start. These might be transient (e.g., network issues) or 
require
+     * configuration fixes.
+     *
+     * @return true if a component failed to initialize
+     */
+    public boolean isInitializationFailure() {
+        return status.isInitializationFailure();
+    }
+
+    /**
+     * Checks if this result represents a task-level exception.
+     * <p>
+     * Task exceptions indicate this specific task failed, but other tasks
+     * may succeed. Processing can continue with the next task.
+     *
+     * @return true if this task failed
+     */
+    public boolean isTaskException() {
+        return status.isTaskException();
     }
 
     /**
-     * Checks if this result represents an application error.
+     * Checks if this result represents a process crash (OOM, timeout, etc.).
      * <p>
-     * Application errors occur when the process runs but encounters
-     * errors during fetch, parse, or emit operations. These are
-     * caught runtime exceptions, not process crashes.
+     * Process crashes are system-level failures where the forked process
+     * terminated abnormally. The process will be auto-restarted and
+     * processing can continue.
      *
-     * @return true if the process ran but encountered an error
+     * @return true if the forked process crashed
      */
-    public boolean isApplicationError() {
-        return status.isApplicationError();
+    public boolean isProcessCrash() {
+        return status.isProcessCrash();
     }
 
     /**
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
index 3fe142c1f..c70f16b1d 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
@@ -74,6 +74,16 @@ public class PipesConfig {
     private int queueSize = DEFAULT_QUEUE_SIZE;
     private int numEmitters = DEFAULT_NUM_EMITTERS;
     private boolean emitIntermediateResults = false;
+    /**
+     * When true, only stop processing on fatal errors (FAILED_TO_INITIALIZE).
+     * When false (default), also stop on initialization failures and 
not-found errors.
+     * <p>
+     * Use true for server mode (tika-server /pipes, /async) where different 
requests
+     * may use different fetchers/emitters.
+     * Use false (default) for CLI batch mode where all tasks typically use 
the same
+     * fetcher/emitter configuration.
+     */
+    private boolean stopOnlyOnFatal = false;
 
     private ArrayList<String> forkedJvmArgs = new ArrayList<>();
     private String javaPath = "java";
@@ -317,4 +327,25 @@ public class PipesConfig {
     public void setEmitIntermediateResults(boolean emitIntermediateResults) {
         this.emitIntermediateResults = emitIntermediateResults;
     }
+
+    /**
+     * When true, only stop processing on fatal errors (FAILED_TO_INITIALIZE).
+     * When false (default), also stop on initialization failures 
(FETCHER_INITIALIZATION_EXCEPTION,
+     * EMITTER_INITIALIZATION_EXCEPTION, CLIENT_UNAVAILABLE_WITHIN_MS) and 
not-found errors
+     * (FETCHER_NOT_FOUND, EMITTER_NOT_FOUND).
+     * <p>
+     * Use true for server mode (tika-server /pipes, /async) where different 
requests
+     * may use different fetchers/emitters - a bad request shouldn't kill the 
server.
+     * Use false (default) for CLI batch mode where all tasks typically use 
the same
+     * fetcher/emitter configuration - no point continuing if configuration is 
wrong.
+     *
+     * @return true if only fatal errors should stop processing
+     */
+    public boolean isStopOnlyOnFatal() {
+        return stopOnlyOnFatal;
+    }
+
+    public void setStopOnlyOnFatal(boolean stopOnlyOnFatal) {
+        this.stopOnlyOnFatal = stopOnlyOnFatal;
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
index a5b4414cc..bbfd9a605 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
@@ -21,7 +21,15 @@ package org.apache.tika.pipes.core;
  */
 public class PipesException extends Exception {
 
+    public PipesException(String message) {
+        super(message);
+    }
+
     public PipesException(Throwable t) {
         super(t);
     }
+
+    public PipesException(String message, Throwable t) {
+        super(message, t);
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
index 5df292f13..282db85f3 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
@@ -26,7 +26,6 @@ public class PipesResults {
     public static final PipesResult OOM = new 
PipesResult(PipesResult.RESULT_STATUS.OOM);
     public static final PipesResult UNSPECIFIED_CRASH = new 
PipesResult(PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH);
     public static final PipesResult EMIT_SUCCESS = new 
PipesResult(PipesResult.RESULT_STATUS.EMIT_SUCCESS);
-    public static final PipesResult INTERRUPTED_EXCEPTION = new 
PipesResult(PipesResult.RESULT_STATUS.INTERRUPTED_EXCEPTION);
     public static final PipesResult EMPTY_OUTPUT = new 
PipesResult(PipesResult.RESULT_STATUS.EMPTY_OUTPUT);
 
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index c21a8707f..f741ee45f 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -71,6 +72,7 @@ public class AsyncProcessor implements Closeable {
     private final Path tikaConfigPath;
     private final PipesReporter pipesReporter;
     private final AtomicLong totalProcessed = new AtomicLong(0);
+    private final AtomicBoolean applicationErrorOccurred = new 
AtomicBoolean(false);
     private static long MAX_OFFER_WAIT_MS = 120000;
     private volatile int numParserThreadsFinished = 0;
     private volatile int numEmitterThreadsFinished = 0;
@@ -115,7 +117,8 @@ public class AsyncProcessor implements Closeable {
 
             for (int i = 0; i < asyncConfig.getNumClients(); i++) {
                 executorCompletionService.submit(
-                        new FetchEmitWorker(asyncConfig, tikaConfigPath, 
fetchEmitTuples, emitDatumTuples));
+                        new FetchEmitWorker(asyncConfig, tikaConfigPath, 
fetchEmitTuples,
+                                emitDatumTuples, applicationErrorOccurred));
             }
 
             EmitterManager emitterManager = 
EmitterManager.load(tikaPluginManager, tikaJsonConfig);
@@ -158,6 +161,9 @@ public class AsyncProcessor implements Closeable {
             throw new IllegalStateException(
                     "Can't call offer after calling close() or " + 
"shutdownNow()");
         }
+        if (applicationErrorOccurred.get()) {
+            throw new PipesException("Can't call offer after an application 
error occurred");
+        }
         if (newFetchEmitTuples.size() > asyncConfig.getQueueSize()) {
             throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
                     asyncConfig.getQueueSize());
@@ -193,10 +199,24 @@ public class AsyncProcessor implements Closeable {
             throw new IllegalStateException(
                     "Can't call offer after calling close() or " + 
"shutdownNow()");
         }
+        if (applicationErrorOccurred.get()) {
+            throw new PipesException("Can't call offer after an application 
error occurred");
+        }
         checkActive();
         return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Returns true if an application error has occurred during processing.
+     * When this returns true, all workers have stopped or are stopping,
+     * and no new tuples can be offered.
+     *
+     * @return true if an application error occurred
+     */
+    public boolean hasApplicationError() {
+        return applicationErrorOccurred.get();
+    }
+
     public void finished() throws InterruptedException {
         for (int i = 0; i < asyncConfig.getNumClients(); i++) {
             boolean offered = 
fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE,
@@ -271,15 +291,18 @@ public class AsyncProcessor implements Closeable {
         private final Path tikaConfigPath;
         private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
         private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue;
+        private final AtomicBoolean applicationErrorOccurred;
 
         private FetchEmitWorker(PipesConfig asyncConfig,
                                 Path tikaConfigPath,
                                 ArrayBlockingQueue<FetchEmitTuple> 
fetchEmitTuples,
-                                ArrayBlockingQueue<EmitDataPair> 
emitDataTupleQueue) {
+                                ArrayBlockingQueue<EmitDataPair> 
emitDataTupleQueue,
+                                AtomicBoolean applicationErrorOccurred) {
             this.asyncConfig = asyncConfig;
             this.tikaConfigPath = tikaConfigPath;
             this.fetchEmitTuples = fetchEmitTuples;
             this.emitDataTupleQueue = emitDataTupleQueue;
+            this.applicationErrorOccurred = applicationErrorOccurred;
         }
 
         @Override
@@ -287,6 +310,12 @@ public class AsyncProcessor implements Closeable {
 
             try (PipesClient pipesClient = new PipesClient(asyncConfig, 
tikaConfigPath)) {
                 while (true) {
+                    // Check if another worker encountered an application error
+                    if (applicationErrorOccurred.get()) {
+                        LOG.info("pipesClientId={}: stopping due to 
application error in another worker",
+                                pipesClient.getPipesClientId());
+                        return PARSER_FUTURE_CODE;
+                    }
                     FetchEmitTuple t = fetchEmitTuples.poll(1, 
TimeUnit.SECONDS);
                     if (t == null) {
                         //skip
@@ -309,6 +338,18 @@ public class AsyncProcessor implements Closeable {
                             LOG.warn("pipesClientId={} crash", 
pipesClient.getPipesClientId(), e);
                             result = PipesResults.UNSPECIFIED_CRASH;
                         }
+                        // Check if we should stop processing based on the 
result
+                        if (shouldStopProcessing(result)) {
+                            LOG.error("pipesClientId={}: {} ({}), stopping all 
processing",
+                                    pipesClient.getPipesClientId(),
+                                    describeStopReason(result),
+                                    result.status());
+                            applicationErrorOccurred.set(true);
+                            pipesReporter.report(t, result, 
System.currentTimeMillis() - start);
+                            throw new 
PipesException(describeStopReason(result) + ": " +
+                                    result.status() +
+                                    (result.message() != null ? " - " + 
result.message() : ""));
+                        }
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("timer -- pipes client process: {} ms",
                                     System.currentTimeMillis() - start);
@@ -343,7 +384,47 @@ public class AsyncProcessor implements Closeable {
                     result.status() == 
PipesResult.RESULT_STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
                 return true;
             }
-            return asyncConfig.isEmitIntermediateResults() && 
(result.isApplicationError() || result.isProcessCrash());
+            // Emit intermediate results on any non-success if configured
+            return asyncConfig.isEmitIntermediateResults() && 
!result.isSuccess();
+        }
+
+        /**
+         * Determines if processing should stop based on the result and 
configuration.
+         * <p>
+         * When stopOnlyOnFatal is true (server mode): only stop on fatal 
errors.
+         * When stopOnlyOnFatal is false (CLI mode, default): also stop on 
initialization
+         * failures and fetcher/emitter not found errors.
+         */
+        private boolean shouldStopProcessing(PipesResult result) {
+            // Always stop on fatal errors
+            if (result.isFatal()) {
+                return true;
+            }
+
+            // In server mode, only fatal errors stop processing
+            if (asyncConfig.isStopOnlyOnFatal()) {
+                return false;
+            }
+
+            // In CLI mode, also stop on initialization failures and not-found 
errors
+            if (result.isInitializationFailure()) {
+                return true;
+            }
+
+            // Stop on fetcher/emitter not found in CLI mode
+            PipesResult.RESULT_STATUS status = result.status();
+            return status == PipesResult.RESULT_STATUS.FETCHER_NOT_FOUND ||
+                   status == PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND;
+        }
+
+        private String describeStopReason(PipesResult result) {
+            if (result.isFatal()) {
+                return "Fatal error";
+            } else if (result.isInitializationFailure()) {
+                return "Initialization failure";
+            } else {
+                return "Configuration error";
+            }
         }
     }
 }
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
index e72269f34..08b11c7ea 100644
--- 
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
@@ -77,12 +77,30 @@ public class PipesForkResult {
     }
 
     /**
-     * Check if there was an application error.
+     * Check if there was a fatal error (failed to initialize pipes system).
      *
-     * @return true if there was an application-level error
+     * @return true if there was a fatal error
      */
-    public boolean isApplicationError() {
-        return pipesResult.isApplicationError();
+    public boolean isFatal() {
+        return pipesResult.isFatal();
+    }
+
+    /**
+     * Check if there was an initialization failure (fetcher/emitter 
initialization issues).
+     *
+     * @return true if there was an initialization failure
+     */
+    public boolean isInitializationFailure() {
+        return pipesResult.isInitializationFailure();
+    }
+
+    /**
+     * Check if there was a task exception (fetch/emit/parse issues for a 
specific request).
+     *
+     * @return true if there was a task exception
+     */
+    public boolean isTaskException() {
+        return pipesResult.isTaskException();
     }
 
     /**
diff --git 
a/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
 
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
index 33c808ad4..30fc322dc 100644
--- 
a/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
+++ 
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
@@ -432,14 +432,17 @@ public class PipesForkParserTest {
             PipesForkResult result = parser.parse(tis);
 
             // At least one of these should be true
-            boolean hasCategory = result.isSuccess() || 
result.isProcessCrash() || result.isApplicationError();
+            boolean hasCategory = result.isSuccess() || 
result.isProcessCrash() ||
+                    result.isFatal() || result.isInitializationFailure() || 
result.isTaskException();
             assertTrue(hasCategory, "Result should have a valid category");
 
             // These should be mutually exclusive
             int trueCount = 0;
             if (result.isSuccess()) trueCount++;
             if (result.isProcessCrash()) trueCount++;
-            if (result.isApplicationError()) trueCount++;
+            if (result.isFatal()) trueCount++;
+            if (result.isInitializationFailure()) trueCount++;
+            if (result.isTaskException()) trueCount++;
             assertEquals(1, trueCount, "Exactly one category should be true");
         }
     }
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index a5bd079aa..7be90f8f8 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -186,7 +186,7 @@ public class PipesClientTest {
 
             PipesResult pipesResult = pipesClient.process(tuple);
             assertEquals(PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE, 
pipesResult.status());
-            assertTrue(pipesResult.isApplicationError(), "FAILED_TO_INITIALIZE 
should be an application error");
+            assertTrue(pipesResult.isFatal(), "FAILED_TO_INITIALIZE should be 
a fatal error");
             Assertions.assertNotNull(pipesResult.message(), "Should have error 
message from server");
             
assertTrue(pipesResult.message().contains("non-existent-fetcher-plugin") ||
                       pipesResult.message().contains("TikaConfigException") ||
@@ -215,7 +215,7 @@ public class PipesClientTest {
 
             PipesResult pipesResult = pipesClient.process(tuple);
             assertEquals(PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE, 
pipesResult.status());
-            assertTrue(pipesResult.isApplicationError(), "FAILED_TO_INITIALIZE 
should be an application error");
+            assertTrue(pipesResult.isFatal(), "FAILED_TO_INITIALIZE should be 
a fatal error");
             Assertions.assertNotNull(pipesResult.message(), "Should have error 
message");
             assertTrue(pipesResult.message().contains("exit code") ||
                             pipesResult.message().contains("JVM") ||
@@ -243,7 +243,7 @@ public class PipesClientTest {
 
             PipesResult pipesResult = pipesClient.process(tuple);
             assertEquals(PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE, 
pipesResult.status());
-            assertTrue(pipesResult.isApplicationError(), "FAILED_TO_INITIALIZE 
should be an application error");
+            assertTrue(pipesResult.isFatal(), "FAILED_TO_INITIALIZE should be 
a fatal error");
             Assertions.assertNotNull(pipesResult.message(), "Should have error 
message");
             assertTrue(pipesResult.message().contains("No such file") || 
pipesResult.message().contains("thisIsntJava"),
                     "Error message should indicate process failure: " + 
pipesResult.message());
@@ -272,8 +272,8 @@ public class PipesClientTest {
             // Should be UNSPECIFIED_CRASH because RuntimeException during 
detection
             // is not caught by pre-parse IOException handler
             assertEquals(PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH, 
pipesResult.status());
-            assertTrue(pipesResult.isProcessCrash() || 
pipesResult.isApplicationError(),
-                    "Should be categorized as a crash or application error");
+            assertTrue(pipesResult.isProcessCrash(),
+                    "Should be categorized as a process crash");
 
             // Should have error message about the crash
             Assertions.assertNotNull(pipesResult.message(), "Should have error 
message");
@@ -412,9 +412,9 @@ public class PipesClientTest {
             assertEquals(PipesResult.RESULT_STATUS.FETCH_EXCEPTION, 
pipesResult.status(),
                     "Should return FETCH_EXCEPTION when file cannot be 
fetched");
 
-            // Verify it's categorized as APPLICATION_ERROR
-            assertTrue(pipesResult.isApplicationError(),
-                    "FETCH_EXCEPTION should be application error category");
+            // Verify it's categorized as TASK_EXCEPTION
+            assertTrue(pipesResult.isTaskException(),
+                    "FETCH_EXCEPTION should be task exception category");
 
             // Verify error message contains useful information
             Assertions.assertNotNull(pipesResult.message());
@@ -464,9 +464,9 @@ public class PipesClientTest {
             assertEquals(PipesResult.RESULT_STATUS.EMIT_EXCEPTION, 
pipesResult.status(),
                     "Should return EMIT_EXCEPTION when emitter fails to 
write");
 
-            // Verify it's categorized as APPLICATION_ERROR
-            assertTrue(pipesResult.isApplicationError(),
-                    "EMIT_EXCEPTION should be application error category");
+            // Verify it's categorized as TASK_EXCEPTION
+            assertTrue(pipesResult.isTaskException(),
+                    "EMIT_EXCEPTION should be task exception category");
         }
     }
 
@@ -493,9 +493,9 @@ public class PipesClientTest {
             
assertEquals(PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION, 
pipesResult.status(),
                     "Should return FETCHER_INITIALIZATION_EXCEPTION when 
fetcher name is invalid");
 
-            // Verify it's categorized as APPLICATION_ERROR
-            assertTrue(pipesResult.isApplicationError(),
-                    "FETCHER_NOT_FOUND should be application error category");
+            // Verify it's categorized as INITIALIZATION_FAILURE
+            assertTrue(pipesResult.isInitializationFailure(),
+                    "FETCHER_INITIALIZATION_EXCEPTION should be initialization 
failure category");
 
             // Verify error message mentions the fetcher name
             Assertions.assertNotNull(pipesResult.message());
@@ -539,9 +539,9 @@ public class PipesClientTest {
             assertEquals(PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND, 
pipesResult.status(),
                     "Should return EMITTER_NOT_FOUND when emitter name is 
invalid");
 
-            // Verify it's categorized as APPLICATION_ERROR
-            assertTrue(pipesResult.isApplicationError(),
-                    "EMITTER_NOT_FOUND should be application error category");
+            // Verify it's categorized as TASK_EXCEPTION
+            assertTrue(pipesResult.isTaskException(),
+                    "EMITTER_NOT_FOUND should be task exception category");
 
             // Verify error message mentions the emitter name
             Assertions.assertNotNull(pipesResult.message());
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index 1ffd7d05d..a022af726 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -102,7 +102,8 @@ public class PipesResource {
         PipesResult pipesResult = pipesParser.parse(fetchEmitTuple);
         if (pipesResult.isProcessCrash()) {
             return returnProcessCrash(pipesResult.status().toString());
-        } else if (pipesResult.isApplicationError()) {
+        } else if (!pipesResult.isSuccess()) {
+            // Handle fatal errors, initialization failures, and task 
exceptions
             return returnApplicationError(pipesResult
                     .status()
                     .toString());


Reply via email to