This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch branch_3x
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/branch_3x by this push:
new 5a092123c8 basically cherrypick
51feb980fc4289fc90ed8cb1d22a42d6c2729bd5
5a092123c8 is described below
commit 5a092123c8954e49ff2ebd3696ae6d6c2d724937
Author: tallison <[email protected]>
AuthorDate: Thu Mar 5 15:00:51 2026 -0500
basically cherrypick 51feb980fc4289fc90ed8cb1d22a42d6c2729bd5
---
.../apache/tika/pipes/async/AsyncProcessor.java | 28 +++++++++++++++++-----
1 file changed, 22 insertions(+), 6 deletions(-)
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 3a6751f4ff..fe532e9dc3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -178,16 +178,32 @@ public class AsyncProcessor implements Closeable {
return fetchEmitTuples.remainingCapacity();
}
- public synchronized boolean offer(FetchEmitTuple t, long offerMs)
+ public boolean offer(FetchEmitTuple t, long offerMs)
throws PipesException, InterruptedException {
if (fetchEmitTuples == null) {
throw new IllegalStateException("queue hasn't been initialized
yet.");
- } else if (isShuttingDown) {
- throw new IllegalStateException(
- "Can't call offer after calling close() or " +
"shutdownNow()");
}
- checkActive();
- return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
+ long deadline = System.currentTimeMillis() + offerMs;
+ while (System.currentTimeMillis() < deadline) {
+ synchronized (this) {
+ if (isShuttingDown) {
+ throw new IllegalStateException(
+ "Can't call offer after calling close() or
shutdownNow()");
+ }
+ checkActive();
+ }
+ // Try a short offer outside the synchronized block so
checkActive()
+ // can still be called by other threads (e.g. the watcher).
+ long remaining = deadline - System.currentTimeMillis();
+ long pollMs = Math.min(remaining, 1000);
+ if (pollMs <= 0) {
+ return false;
+ }
+ if (fetchEmitTuples.offer(t, pollMs, TimeUnit.MILLISECONDS)) {
+ return true;
+ }
+ }
+ return false;
}
public void finished() throws InterruptedException {