This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new a756502b4 TIKA-4570 (#2457)
a756502b4 is described below
commit a756502b4c984d815088fda41d2a61ceacd3d4ad
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 16 11:09:53 2025 -0500
TIKA-4570 (#2457)
* TIKA-4570 -- first step
* TIKA-4570 -- improve error codes and handling in AsyncProcessor
---
.../apache/tika/async/cli/AsyncProcessorTest.java | 52 ++++++
.../org/apache/tika/pipes/api/PipesResult.java | 181 +++++++++++++--------
.../org/apache/tika/pipes/core/PipesConfig.java | 31 ++++
.../org/apache/tika/pipes/core/PipesException.java | 8 +
.../org/apache/tika/pipes/core/PipesResults.java | 1 -
.../tika/pipes/core/async/AsyncProcessor.java | 87 +++++++++-
.../apache/tika/pipes/fork/PipesForkResult.java | 26 ++-
.../tika/pipes/fork/PipesForkParserTest.java | 7 +-
.../apache/tika/pipes/core/PipesClientTest.java | 34 ++--
.../tika/server/core/resource/PipesResource.java | 3 +-
10 files changed, 337 insertions(+), 93 deletions(-)
diff --git
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
index 51aff03a0..29bf9ebf8 100644
---
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
+++
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
@@ -19,6 +19,8 @@ package org.apache.tika.async.cli;
import static
org.apache.tika.pipes.api.pipesiterator.PipesIteratorBaseConfig.DEFAULT_HANDLER_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.BufferedReader;
import java.io.OutputStream;
@@ -46,6 +48,7 @@ import org.apache.tika.pipes.api.HandlerConfig;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
+import org.apache.tika.pipes.core.PipesException;
import org.apache.tika.pipes.core.async.AsyncProcessor;
import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.serialization.JsonMetadataList;
@@ -155,4 +158,53 @@ public class AsyncProcessorTest extends TikaTest {
.get(1)
.get(TikaCoreProperties.TIKA_CONTENT));
}
+
+ @Test
+ public void testStopsOnApplicationError() throws Exception {
+ // Test that AsyncProcessor stops processing when an application error
occurs
+ // (TIKA-4570)
+ AsyncProcessor processor = new
AsyncProcessor(configDir.resolve("tika-config.json"));
+
+ // Create a tuple with a non-existent fetcher - this will cause
FETCHER_NOT_FOUND
+ // which is a TASK_EXCEPTION but will stop processing in CLI mode
(default)
+ ParseContext parseContext = new ParseContext();
+ parseContext.set(HandlerConfig.class, DEFAULT_HANDLER_CONFIG);
+ FetchEmitTuple badTuple = new FetchEmitTuple(
+ "bad-tuple-1",
+ new FetchKey("non-existent-fetcher", "some-file.txt"),
+ new EmitKey("fse-json", "emit-bad"),
+ new Metadata(),
+ parseContext,
+ FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+
+ // Offer the bad tuple
+ processor.offer(badTuple, 1000);
+
+ // Wait for the error to be detected
+ int maxWaitMs = 30000;
+ int waited = 0;
+ while (!processor.hasApplicationError() && waited < maxWaitMs) {
+ Thread.sleep(100);
+ waited += 100;
+ }
+
+ // Verify that the application error was detected
+ assertTrue(processor.hasApplicationError(),
+ "AsyncProcessor should detect application error from bad
fetcher");
+
+ // Verify that subsequent offers throw PipesException
+ FetchEmitTuple anotherTuple = new FetchEmitTuple(
+ "another-tuple",
+ new FetchKey("fsf", "mock.xml"),
+ new EmitKey("fse-json", "emit-another"),
+ new Metadata(),
+ parseContext,
+ FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+
+ assertThrows(PipesException.class, () -> {
+ processor.offer(anotherTuple, 1000);
+ }, "Should throw PipesException when offering after application
error");
+
+ processor.close();
+ }
}
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
index 9418bac0b..482e13e7f 100644
---
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/PipesResult.java
@@ -25,65 +25,61 @@ public record PipesResult(RESULT_STATUS status, EmitData
emitData, String messag
/**
* High-level categorization of result statuses.
* <p>
- * Categories help distinguish between:
+ * Categories help distinguish between different types of failures and
successes,
+ * allowing infrastructure to decide how to handle each case:
* <ul>
- * <li>Process crashes (OOM, timeout, system-level failures)</li>
- * <li>Application errors (process ran but encountered errors)</li>
- * <li>Successful processing (possibly with warnings)</li>
+ * <li>{@link #FATAL} - System cannot continue, must be fixed and
restarted</li>
+ * <li>{@link #INITIALIZATION_FAILURE} - Component failed to initialize,
might be transient</li>
+ * <li>{@link #TASK_EXCEPTION} - This task failed, but other tasks may
succeed</li>
+ * <li>{@link #PROCESS_CRASH} - Forked process crashed, auto-restart and
continue</li>
+ * <li>{@link #SUCCESS} - Processing completed successfully (possibly
with warnings)</li>
* </ul>
*/
public enum CATEGORY {
- /** Forked process crashed due to OOM, timeout, or other system
failure */
- PROCESS_CRASH,
+ /** Fatal system error - cannot continue, must be fixed and restarted
*/
+ FATAL,
+
+ /** Component initialization failed - processing should stop, might be
transient */
+ INITIALIZATION_FAILURE,
- /** Process completed but encountered application-level errors */
- APPLICATION_ERROR,
+ /** Task-level exception - this task failed, log and continue with
next task */
+ TASK_EXCEPTION,
+
+ /** Forked process crashed due to OOM, timeout, or other system
failure - auto-restart */
+ PROCESS_CRASH,
/** Processing completed successfully (possibly with warnings) */
SUCCESS
}
public enum RESULT_STATUS {
- // Process crashes
- OOM(CATEGORY.PROCESS_CRASH),
- TIMEOUT(CATEGORY.PROCESS_CRASH),
- UNSPECIFIED_CRASH(CATEGORY.PROCESS_CRASH),
+ // Fatal errors - system cannot continue
+ FAILED_TO_INITIALIZE(CATEGORY.FATAL),
- // Initialization failures
- FAILED_TO_INITIALIZE(CATEGORY.APPLICATION_ERROR),
+ // Initialization failures - component failed to start, might be
transient
+ FETCHER_INITIALIZATION_EXCEPTION(CATEGORY.INITIALIZATION_FAILURE),
+ EMITTER_INITIALIZATION_EXCEPTION(CATEGORY.INITIALIZATION_FAILURE),
+ CLIENT_UNAVAILABLE_WITHIN_MS(CATEGORY.INITIALIZATION_FAILURE),
- // Process crashes (system-level failures)
- CLIENT_UNAVAILABLE_WITHIN_MS(CATEGORY.APPLICATION_ERROR),
+ // Task exceptions - this task failed, continue with next
+ FETCH_EXCEPTION(CATEGORY.TASK_EXCEPTION),
+ EMIT_EXCEPTION(CATEGORY.TASK_EXCEPTION),
+ FETCHER_NOT_FOUND(CATEGORY.TASK_EXCEPTION),
+ EMITTER_NOT_FOUND(CATEGORY.TASK_EXCEPTION),
- // Fetch failures
- FETCHER_INITIALIZATION_EXCEPTION(CATEGORY.APPLICATION_ERROR),
- FETCH_EXCEPTION(CATEGORY.APPLICATION_ERROR),
+ // Process crashes - forked process died, auto-restart
+ OOM(CATEGORY.PROCESS_CRASH),
+ TIMEOUT(CATEGORY.PROCESS_CRASH),
+ UNSPECIFIED_CRASH(CATEGORY.PROCESS_CRASH),
- // Success with edge case
+ // Success states
EMPTY_OUTPUT(CATEGORY.SUCCESS),
-
-
- // Parse success states
PARSE_SUCCESS(CATEGORY.SUCCESS),
PARSE_SUCCESS_WITH_EXCEPTION(CATEGORY.SUCCESS),
PARSE_EXCEPTION_NO_EMIT(CATEGORY.SUCCESS),
-
-
- // Emit success states
EMIT_SUCCESS(CATEGORY.SUCCESS),
EMIT_SUCCESS_PARSE_EXCEPTION(CATEGORY.SUCCESS),
- EMIT_SUCCESS_PASSBACK(CATEGORY.SUCCESS),
-
- // Emit failure
- EMIT_EXCEPTION(CATEGORY.APPLICATION_ERROR),
-
- // Emitter failures
- EMITTER_INITIALIZATION_EXCEPTION(CATEGORY.APPLICATION_ERROR),
- EMITTER_NOT_FOUND(CATEGORY.APPLICATION_ERROR),
-
- // Other errors
- INTERRUPTED_EXCEPTION(CATEGORY.APPLICATION_ERROR),
- FETCHER_NOT_FOUND(CATEGORY.APPLICATION_ERROR);
+ EMIT_SUCCESS_PASSBACK(CATEGORY.SUCCESS);
private final CATEGORY category;
@@ -95,34 +91,66 @@ public record PipesResult(RESULT_STATUS status, EmitData
emitData, String messag
/**
* Gets the high-level category for this result status.
*
- * @return the category (PROCESS_CRASH, APPLICATION_ERROR, or SUCCESS)
+ * @return the category (FATAL, INITIALIZATION_FAILURE,
TASK_EXCEPTION, PROCESS_CRASH, or SUCCESS)
*/
public CATEGORY getCategory() {
return category;
}
/**
- * Checks if this status represents a process crash (OOM, timeout,
etc.).
+ * Checks if this status represents a fatal error.
+ * <p>
+ * Fatal errors mean the system cannot continue and must be fixed and
restarted.
*
- * @return true if the forked process crashed
+ * @return true if this is a fatal error
*/
- public boolean isProcessCrash() {
- return category == CATEGORY.PROCESS_CRASH;
+ public boolean isFatal() {
+ return category == CATEGORY.FATAL;
+ }
+
+ /**
+ * Checks if this status represents an initialization failure.
+ * <p>
+ * Initialization failures occur when a component (fetcher, emitter,
client)
+ * fails to start. These might be transient (e.g., network issues) or
require
+ * configuration fixes.
+ *
+ * @return true if a component failed to initialize
+ */
+ public boolean isInitializationFailure() {
+ return category == CATEGORY.INITIALIZATION_FAILURE;
}
/**
- * Checks if this status represents an application error.
+ * Checks if this status represents a task-level exception.
+ * <p>
+ * Task exceptions indicate this specific task failed, but other tasks
+ * may succeed. Processing can continue with the next task.
*
- * @return true if the process ran but encountered an error
+ * @return true if this task failed
*/
- public boolean isApplicationError() {
- return category == CATEGORY.APPLICATION_ERROR;
+ public boolean isTaskException() {
+ return category == CATEGORY.TASK_EXCEPTION;
}
/**
- * Checks if this status represents successful processing. Successful
- * processing includes handling standard runtime exceptions during the
- * parse.
+ * Checks if this status represents a process crash (OOM, timeout,
etc.).
+ * <p>
+ * Process crashes are system-level failures where the forked process
+ * terminated abnormally. The process will be auto-restarted and
+ * processing can continue.
+ *
+ * @return true if the forked process crashed
+ */
+ public boolean isProcessCrash() {
+ return category == CATEGORY.PROCESS_CRASH;
+ }
+
+ /**
+ * Checks if this status represents successful processing.
+ * <p>
+ * Success includes normal completion as well as cases where
+ * processing completed with warnings (e.g.,
PARSE_SUCCESS_WITH_EXCEPTION).
*
* @return true if processing completed successfully (possibly with
warnings)
*/
@@ -161,36 +189,59 @@ public record PipesResult(RESULT_STATUS status, EmitData
emitData, String messag
/**
* Gets the high-level category for this result.
*
- * @return the category (PROCESS_CRASH, APPLICATION_ERROR, or SUCCESS)
+ * @return the category (FATAL, INITIALIZATION_FAILURE, TASK_EXCEPTION,
PROCESS_CRASH, or SUCCESS)
*/
public CATEGORY getCategory() {
return status.getCategory();
}
/**
- * Checks if this result represents a process crash (OOM, timeout, etc.).
+ * Checks if this result represents a fatal error.
* <p>
- * Process crashes are system-level failures where the forked process
- * terminated abnormally, as opposed to application errors where the
- * process completed but encountered errors during execution.
+ * Fatal errors mean the system cannot continue and must be fixed and
restarted.
*
- * @return true if the forked process crashed
+ * @return true if this is a fatal error
*/
- public boolean isProcessCrash() {
- return status.isProcessCrash();
+ public boolean isFatal() {
+ return status.isFatal();
+ }
+
+ /**
+ * Checks if this result represents an initialization failure.
+ * <p>
+ * Initialization failures occur when a component (fetcher, emitter,
client)
+ * fails to start. These might be transient (e.g., network issues) or
require
+ * configuration fixes.
+ *
+ * @return true if a component failed to initialize
+ */
+ public boolean isInitializationFailure() {
+ return status.isInitializationFailure();
+ }
+
+ /**
+ * Checks if this result represents a task-level exception.
+ * <p>
+ * Task exceptions indicate this specific task failed, but other tasks
+ * may succeed. Processing can continue with the next task.
+ *
+ * @return true if this task failed
+ */
+ public boolean isTaskException() {
+ return status.isTaskException();
}
/**
- * Checks if this result represents an application error.
+ * Checks if this result represents a process crash (OOM, timeout, etc.).
* <p>
- * Application errors occur when the process runs but encounters
- * errors during fetch, parse, or emit operations. These are
- * caught runtime exceptions, not process crashes.
+ * Process crashes are system-level failures where the forked process
+ * terminated abnormally. The process will be auto-restarted and
+ * processing can continue.
*
- * @return true if the process ran but encountered an error
+ * @return true if the forked process crashed
*/
- public boolean isApplicationError() {
- return status.isApplicationError();
+ public boolean isProcessCrash() {
+ return status.isProcessCrash();
}
/**
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
index 3fe142c1f..c70f16b1d 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
@@ -74,6 +74,16 @@ public class PipesConfig {
private int queueSize = DEFAULT_QUEUE_SIZE;
private int numEmitters = DEFAULT_NUM_EMITTERS;
private boolean emitIntermediateResults = false;
+ /**
+ * When true, only stop processing on fatal errors (FAILED_TO_INITIALIZE).
+ * When false (default), also stop on initialization failures and
not-found errors.
+ * <p>
+ * Use true for server mode (tika-server /pipes, /async) where different
requests
+ * may use different fetchers/emitters.
+ * Use false (default) for CLI batch mode where all tasks typically use
the same
+ * fetcher/emitter configuration.
+ */
+ private boolean stopOnlyOnFatal = false;
private ArrayList<String> forkedJvmArgs = new ArrayList<>();
private String javaPath = "java";
@@ -317,4 +327,25 @@ public class PipesConfig {
public void setEmitIntermediateResults(boolean emitIntermediateResults) {
this.emitIntermediateResults = emitIntermediateResults;
}
+
+ /**
+ * When true, only stop processing on fatal errors (FAILED_TO_INITIALIZE).
+ * When false (default), also stop on initialization failures
(FETCHER_INITIALIZATION_EXCEPTION,
+ * EMITTER_INITIALIZATION_EXCEPTION, CLIENT_UNAVAILABLE_WITHIN_MS) and
not-found errors
+ * (FETCHER_NOT_FOUND, EMITTER_NOT_FOUND).
+ * <p>
+ * Use true for server mode (tika-server /pipes, /async) where different
requests
+ * may use different fetchers/emitters - a bad request shouldn't kill the
server.
+ * Use false (default) for CLI batch mode where all tasks typically use
the same
+ * fetcher/emitter configuration - no point continuing if configuration is
wrong.
+ *
+ * @return true if only fatal errors should stop processing
+ */
+ public boolean isStopOnlyOnFatal() {
+ return stopOnlyOnFatal;
+ }
+
+ public void setStopOnlyOnFatal(boolean stopOnlyOnFatal) {
+ this.stopOnlyOnFatal = stopOnlyOnFatal;
+ }
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
index a5b4414cc..bbfd9a605 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesException.java
@@ -21,7 +21,15 @@ package org.apache.tika.pipes.core;
*/
public class PipesException extends Exception {
+ public PipesException(String message) {
+ super(message);
+ }
+
public PipesException(Throwable t) {
super(t);
}
+
+ public PipesException(String message, Throwable t) {
+ super(message, t);
+ }
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
index 5df292f13..282db85f3 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResults.java
@@ -26,7 +26,6 @@ public class PipesResults {
public static final PipesResult OOM = new
PipesResult(PipesResult.RESULT_STATUS.OOM);
public static final PipesResult UNSPECIFIED_CRASH = new
PipesResult(PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH);
public static final PipesResult EMIT_SUCCESS = new
PipesResult(PipesResult.RESULT_STATUS.EMIT_SUCCESS);
- public static final PipesResult INTERRUPTED_EXCEPTION = new
PipesResult(PipesResult.RESULT_STATUS.INTERRUPTED_EXCEPTION);
public static final PipesResult EMPTY_OUTPUT = new
PipesResult(PipesResult.RESULT_STATUS.EMPTY_OUTPUT);
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index c21a8707f..f741ee45f 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -71,6 +72,7 @@ public class AsyncProcessor implements Closeable {
private final Path tikaConfigPath;
private final PipesReporter pipesReporter;
private final AtomicLong totalProcessed = new AtomicLong(0);
+ private final AtomicBoolean applicationErrorOccurred = new
AtomicBoolean(false);
private static long MAX_OFFER_WAIT_MS = 120000;
private volatile int numParserThreadsFinished = 0;
private volatile int numEmitterThreadsFinished = 0;
@@ -115,7 +117,8 @@ public class AsyncProcessor implements Closeable {
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
executorCompletionService.submit(
- new FetchEmitWorker(asyncConfig, tikaConfigPath,
fetchEmitTuples, emitDatumTuples));
+ new FetchEmitWorker(asyncConfig, tikaConfigPath,
fetchEmitTuples,
+ emitDatumTuples, applicationErrorOccurred));
}
EmitterManager emitterManager =
EmitterManager.load(tikaPluginManager, tikaJsonConfig);
@@ -158,6 +161,9 @@ public class AsyncProcessor implements Closeable {
throw new IllegalStateException(
"Can't call offer after calling close() or " +
"shutdownNow()");
}
+ if (applicationErrorOccurred.get()) {
+ throw new PipesException("Can't call offer after an application
error occurred");
+ }
if (newFetchEmitTuples.size() > asyncConfig.getQueueSize()) {
throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
asyncConfig.getQueueSize());
@@ -193,10 +199,24 @@ public class AsyncProcessor implements Closeable {
throw new IllegalStateException(
"Can't call offer after calling close() or " +
"shutdownNow()");
}
+ if (applicationErrorOccurred.get()) {
+ throw new PipesException("Can't call offer after an application
error occurred");
+ }
checkActive();
return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
}
+ /**
+ * Returns true if an application error has occurred during processing.
+ * When this returns true, all workers have stopped or are stopping,
+ * and no new tuples can be offered.
+ *
+ * @return true if an application error occurred
+ */
+ public boolean hasApplicationError() {
+ return applicationErrorOccurred.get();
+ }
+
public void finished() throws InterruptedException {
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
boolean offered =
fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE,
@@ -271,15 +291,18 @@ public class AsyncProcessor implements Closeable {
private final Path tikaConfigPath;
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue;
+ private final AtomicBoolean applicationErrorOccurred;
private FetchEmitWorker(PipesConfig asyncConfig,
Path tikaConfigPath,
ArrayBlockingQueue<FetchEmitTuple>
fetchEmitTuples,
- ArrayBlockingQueue<EmitDataPair>
emitDataTupleQueue) {
+ ArrayBlockingQueue<EmitDataPair>
emitDataTupleQueue,
+ AtomicBoolean applicationErrorOccurred) {
this.asyncConfig = asyncConfig;
this.tikaConfigPath = tikaConfigPath;
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataTupleQueue = emitDataTupleQueue;
+ this.applicationErrorOccurred = applicationErrorOccurred;
}
@Override
@@ -287,6 +310,12 @@ public class AsyncProcessor implements Closeable {
try (PipesClient pipesClient = new PipesClient(asyncConfig,
tikaConfigPath)) {
while (true) {
+ // Check if another worker encountered an application error
+ if (applicationErrorOccurred.get()) {
+ LOG.info("pipesClientId={}: stopping due to
application error in another worker",
+ pipesClient.getPipesClientId());
+ return PARSER_FUTURE_CODE;
+ }
FetchEmitTuple t = fetchEmitTuples.poll(1,
TimeUnit.SECONDS);
if (t == null) {
//skip
@@ -309,6 +338,18 @@ public class AsyncProcessor implements Closeable {
LOG.warn("pipesClientId={} crash",
pipesClient.getPipesClientId(), e);
result = PipesResults.UNSPECIFIED_CRASH;
}
+ // Check if we should stop processing based on the
result
+ if (shouldStopProcessing(result)) {
+ LOG.error("pipesClientId={}: {} ({}), stopping all
processing",
+ pipesClient.getPipesClientId(),
+ describeStopReason(result),
+ result.status());
+ applicationErrorOccurred.set(true);
+ pipesReporter.report(t, result,
System.currentTimeMillis() - start);
+ throw new
PipesException(describeStopReason(result) + ": " +
+ result.status() +
+ (result.message() != null ? " - " +
result.message() : ""));
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- pipes client process: {} ms",
System.currentTimeMillis() - start);
@@ -343,7 +384,47 @@ public class AsyncProcessor implements Closeable {
result.status() ==
PipesResult.RESULT_STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
return true;
}
- return asyncConfig.isEmitIntermediateResults() &&
(result.isApplicationError() || result.isProcessCrash());
+ // Emit intermediate results on any non-success if configured
+ return asyncConfig.isEmitIntermediateResults() &&
!result.isSuccess();
+ }
+
+ /**
+ * Determines if processing should stop based on the result and
configuration.
+ * <p>
+ * When stopOnlyOnFatal is true (server mode): only stop on fatal
errors.
+ * When stopOnlyOnFatal is false (CLI mode, default): also stop on
initialization
+ * failures and fetcher/emitter not found errors.
+ */
+ private boolean shouldStopProcessing(PipesResult result) {
+ // Always stop on fatal errors
+ if (result.isFatal()) {
+ return true;
+ }
+
+ // In server mode, only fatal errors stop processing
+ if (asyncConfig.isStopOnlyOnFatal()) {
+ return false;
+ }
+
+ // In CLI mode, also stop on initialization failures and not-found
errors
+ if (result.isInitializationFailure()) {
+ return true;
+ }
+
+ // Stop on fetcher/emitter not found in CLI mode
+ PipesResult.RESULT_STATUS status = result.status();
+ return status == PipesResult.RESULT_STATUS.FETCHER_NOT_FOUND ||
+ status == PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND;
+ }
+
+ private String describeStopReason(PipesResult result) {
+ if (result.isFatal()) {
+ return "Fatal error";
+ } else if (result.isInitializationFailure()) {
+ return "Initialization failure";
+ } else {
+ return "Configuration error";
+ }
}
}
}
diff --git
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
index e72269f34..08b11c7ea 100644
---
a/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
+++
b/tika-pipes/tika-pipes-fork-parser/src/main/java/org/apache/tika/pipes/fork/PipesForkResult.java
@@ -77,12 +77,30 @@ public class PipesForkResult {
}
/**
- * Check if there was an application error.
+ * Check if there was a fatal error (failed to initialize pipes system).
*
- * @return true if there was an application-level error
+ * @return true if there was a fatal error
*/
- public boolean isApplicationError() {
- return pipesResult.isApplicationError();
+ public boolean isFatal() {
+ return pipesResult.isFatal();
+ }
+
+ /**
+ * Check if there was an initialization failure (fetcher/emitter
initialization issues).
+ *
+ * @return true if there was an initialization failure
+ */
+ public boolean isInitializationFailure() {
+ return pipesResult.isInitializationFailure();
+ }
+
+ /**
+ * Check if there was a task exception (fetch/emit/parse issues for a
specific request).
+ *
+ * @return true if there was a task exception
+ */
+ public boolean isTaskException() {
+ return pipesResult.isTaskException();
}
/**
diff --git
a/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
index 33c808ad4..30fc322dc 100644
---
a/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
+++
b/tika-pipes/tika-pipes-fork-parser/src/test/java/org/apache/tika/pipes/fork/PipesForkParserTest.java
@@ -432,14 +432,17 @@ public class PipesForkParserTest {
PipesForkResult result = parser.parse(tis);
// At least one of these should be true
- boolean hasCategory = result.isSuccess() ||
result.isProcessCrash() || result.isApplicationError();
+ boolean hasCategory = result.isSuccess() ||
result.isProcessCrash() ||
+ result.isFatal() || result.isInitializationFailure() ||
result.isTaskException();
assertTrue(hasCategory, "Result should have a valid category");
// These should be mutually exclusive
int trueCount = 0;
if (result.isSuccess()) trueCount++;
if (result.isProcessCrash()) trueCount++;
- if (result.isApplicationError()) trueCount++;
+ if (result.isFatal()) trueCount++;
+ if (result.isInitializationFailure()) trueCount++;
+ if (result.isTaskException()) trueCount++;
assertEquals(1, trueCount, "Exactly one category should be true");
}
}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index a5bd079aa..7be90f8f8 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -186,7 +186,7 @@ public class PipesClientTest {
PipesResult pipesResult = pipesClient.process(tuple);
assertEquals(PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE,
pipesResult.status());
- assertTrue(pipesResult.isApplicationError(), "FAILED_TO_INITIALIZE
should be an application error");
+ assertTrue(pipesResult.isFatal(), "FAILED_TO_INITIALIZE should be
a fatal error");
Assertions.assertNotNull(pipesResult.message(), "Should have error
message from server");
assertTrue(pipesResult.message().contains("non-existent-fetcher-plugin") ||
pipesResult.message().contains("TikaConfigException") ||
@@ -215,7 +215,7 @@ public class PipesClientTest {
PipesResult pipesResult = pipesClient.process(tuple);
assertEquals(PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE,
pipesResult.status());
- assertTrue(pipesResult.isApplicationError(), "FAILED_TO_INITIALIZE
should be an application error");
+ assertTrue(pipesResult.isFatal(), "FAILED_TO_INITIALIZE should be
a fatal error");
Assertions.assertNotNull(pipesResult.message(), "Should have error
message");
assertTrue(pipesResult.message().contains("exit code") ||
pipesResult.message().contains("JVM") ||
@@ -243,7 +243,7 @@ public class PipesClientTest {
PipesResult pipesResult = pipesClient.process(tuple);
assertEquals(PipesResult.RESULT_STATUS.FAILED_TO_INITIALIZE,
pipesResult.status());
- assertTrue(pipesResult.isApplicationError(), "FAILED_TO_INITIALIZE
should be an application error");
+ assertTrue(pipesResult.isFatal(), "FAILED_TO_INITIALIZE should be
a fatal error");
Assertions.assertNotNull(pipesResult.message(), "Should have error
message");
assertTrue(pipesResult.message().contains("No such file") ||
pipesResult.message().contains("thisIsntJava"),
"Error message should indicate process failure: " +
pipesResult.message());
@@ -272,8 +272,8 @@ public class PipesClientTest {
// Should be UNSPECIFIED_CRASH because RuntimeException during
detection
// is not caught by pre-parse IOException handler
assertEquals(PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH,
pipesResult.status());
- assertTrue(pipesResult.isProcessCrash() ||
pipesResult.isApplicationError(),
- "Should be categorized as a crash or application error");
+ assertTrue(pipesResult.isProcessCrash(),
+ "Should be categorized as a process crash");
// Should have error message about the crash
Assertions.assertNotNull(pipesResult.message(), "Should have error
message");
@@ -412,9 +412,9 @@ public class PipesClientTest {
assertEquals(PipesResult.RESULT_STATUS.FETCH_EXCEPTION,
pipesResult.status(),
"Should return FETCH_EXCEPTION when file cannot be
fetched");
- // Verify it's categorized as APPLICATION_ERROR
- assertTrue(pipesResult.isApplicationError(),
- "FETCH_EXCEPTION should be application error category");
+ // Verify it's categorized as TASK_EXCEPTION
+ assertTrue(pipesResult.isTaskException(),
+ "FETCH_EXCEPTION should be task exception category");
// Verify error message contains useful information
Assertions.assertNotNull(pipesResult.message());
@@ -464,9 +464,9 @@ public class PipesClientTest {
assertEquals(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
pipesResult.status(),
"Should return EMIT_EXCEPTION when emitter fails to
write");
- // Verify it's categorized as APPLICATION_ERROR
- assertTrue(pipesResult.isApplicationError(),
- "EMIT_EXCEPTION should be application error category");
+ // Verify it's categorized as TASK_EXCEPTION
+ assertTrue(pipesResult.isTaskException(),
+ "EMIT_EXCEPTION should be task exception category");
}
}
@@ -493,9 +493,9 @@ public class PipesClientTest {
assertEquals(PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION,
pipesResult.status(),
"Should return FETCHER_INITIALIZATION_EXCEPTION when
fetcher name is invalid");
- // Verify it's categorized as APPLICATION_ERROR
- assertTrue(pipesResult.isApplicationError(),
- "FETCHER_NOT_FOUND should be application error category");
+ // Verify it's categorized as INITIALIZATION_FAILURE
+ assertTrue(pipesResult.isInitializationFailure(),
+ "FETCHER_INITIALIZATION_EXCEPTION should be initialization
failure category");
// Verify error message mentions the fetcher name
Assertions.assertNotNull(pipesResult.message());
@@ -539,9 +539,9 @@ public class PipesClientTest {
assertEquals(PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND,
pipesResult.status(),
"Should return EMITTER_NOT_FOUND when emitter name is
invalid");
- // Verify it's categorized as APPLICATION_ERROR
- assertTrue(pipesResult.isApplicationError(),
- "EMITTER_NOT_FOUND should be application error category");
+ // Verify it's categorized as TASK_EXCEPTION
+ assertTrue(pipesResult.isTaskException(),
+ "EMITTER_NOT_FOUND should be task exception category");
// Verify error message mentions the emitter name
Assertions.assertNotNull(pipesResult.message());
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index 1ffd7d05d..a022af726 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -102,7 +102,8 @@ public class PipesResource {
PipesResult pipesResult = pipesParser.parse(fetchEmitTuple);
if (pipesResult.isProcessCrash()) {
return returnProcessCrash(pipesResult.status().toString());
- } else if (pipesResult.isApplicationError()) {
+ } else if (!pipesResult.isSuccess()) {
+ // Handle fatal errors, initialization failures, and task
exceptions
return returnApplicationError(pipesResult
.status()
.toString());