This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 507c9933c TIKA-4497 -- allow per file timeouts in tika-pipes (#2343)
507c9933c is described below

commit 507c9933c3b52f26817872daba082fdebcb41ebb
Author: Tim Allison <[email protected]>
AuthorDate: Wed Oct 1 15:07:23 2025 -0400

    TIKA-4497 -- allow per file timeouts in tika-pipes (#2343)
---
 CHANGES.txt                                        |  2 ++
 .../org/apache/tika/config/TikaTaskTimeout.java    |  7 ++++-
 .../org/apache/tika/pipes/core/PipesClient.java    |  4 ++-
 .../org/apache/tika/pipes/core/PipesServer.java    | 34 +++++++++++++++++++---
 .../apache/tika/pipes/core/PipesClientTest.java    | 30 +++++++++++++++----
 .../resources/test-documents/mock/timeout-10s.xml  | 27 +++++++++++++++++
 6 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 865a3bb53..3f04c47eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Release 4.0.0-BETA1 - ???
 
 Release 3.3.0 - ???
 
+  * Allow per file timeouts in tika-pipes (TIKA-4497).
+
   * Add matroska detector (TIKA-1180).
 
   * Allow multiple values for many Dublin Core keys (TIKA-4466).
diff --git 
a/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java 
b/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java
index 821be35e5..842c7a432 100644
--- a/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java
+++ b/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java
@@ -16,9 +16,11 @@
  */
 package org.apache.tika.config;
 
+import java.io.Serializable;
+
 import org.apache.tika.parser.ParseContext;
 
-public class TikaTaskTimeout {
+public class TikaTaskTimeout implements Serializable {
 
     private final long timeoutMillis;
 
@@ -31,6 +33,9 @@ public class TikaTaskTimeout {
     }
 
     public static long getTimeoutMillis(ParseContext context, long 
defaultTimeoutMillis) {
+        if (context == null) {
+            return defaultTimeoutMillis;
+        }
         TikaTaskTimeout tikaTaskTimeout = context.get(TikaTaskTimeout.class);
         if (tikaTaskTimeout == null) {
             return defaultTimeoutMillis;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 2a27320a0..11eb87205 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -47,6 +47,7 @@ import 
org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.config.TikaTaskTimeout;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
@@ -200,7 +201,8 @@ public class PipesClient implements Closeable {
                         ": PipesClient closed");
             }
             executorService.execute(futureTask);
-            return futureTask.get(pipesConfig.getTimeoutMillis(), 
TimeUnit.MILLISECONDS);
+            long timeout = 
TikaTaskTimeout.getTimeoutMillis(t.getParseContext(), 
pipesConfig.getTimeoutMillis());
+            return futureTask.get(timeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             destroyForcibly();
             throw e;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
index 2978668c8..d49d95c25 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
@@ -41,6 +41,7 @@ import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 
 import org.apache.tika.config.TikaConfig;
+import org.apache.tika.config.TikaTaskTimeout;
 import org.apache.tika.detect.Detector;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.exception.TikaConfigException;
@@ -135,8 +136,9 @@ public class PipesServer implements Runnable {
     //if it is smaller than this value, write it back to the
     //PipesClient so that it can cache the extracts and then batch emit.
     private final long maxForEmitBatchBytes;
-    private final long serverParseTimeoutMillis;
+    private final long defaultServerParseTimeoutMillis;
     private final long serverWaitTimeoutMillis;
+    private volatile long serverParseTimeoutMillis;
     private Parser autoDetectParser;
     private Parser rMetaParser;
     private TikaConfig tikaConfig;
@@ -154,6 +156,7 @@ public class PipesServer implements Runnable {
         this.input = new DataInputStream(in);
         this.output = new DataOutputStream(out);
         this.maxForEmitBatchBytes = maxForEmitBatchBytes;
+        this.defaultServerParseTimeoutMillis = serverParseTimeoutMillis;
         this.serverParseTimeoutMillis = serverParseTimeoutMillis;
         this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
         this.parsing = false;
@@ -189,9 +192,9 @@ public class PipesServer implements Runnable {
             while (true) {
                 synchronized (lock) {
                     long elapsed = System.currentTimeMillis() - since;
-                    if (parsing && elapsed > serverParseTimeoutMillis) {
-                        LOG.warn("timeout server; elapsed {}  with {}", 
elapsed,
-                                serverParseTimeoutMillis);
+                    long timeout = serverParseTimeoutMillis;
+                    if (parsing && elapsed > timeout) {
+                        LOG.warn("timeout server; elapsed {}ms  with 
timeout={}", elapsed, timeout);
                         exit(TIMEOUT_EXIT_CODE);
                     } else if (!parsing && serverWaitTimeoutMillis > 0 &&
                             elapsed > serverWaitTimeoutMillis) {
@@ -341,6 +344,7 @@ public class PipesServer implements Runnable {
                         System.currentTimeMillis() - start);
             }
             start = System.currentTimeMillis();
+            updateTimeout(t);
             actuallyParse(t);
             if (LOG.isTraceEnabled()) {
                 LOG.trace("timer -- actually parsed: {} ms", 
System.currentTimeMillis() - start);
@@ -351,10 +355,32 @@ public class PipesServer implements Runnable {
             synchronized (lock) {
                 parsing = false;
                 since = System.currentTimeMillis();
+                serverParseTimeoutMillis = defaultServerParseTimeoutMillis;
             }
         }
     }
 
+    private void updateTimeout(FetchEmitTuple t) {
+        if (t == null) {
+            return;
+        }
+
+        ParseContext parseContext = t.getParseContext();
+        if (parseContext == null) {
+            return;
+        }
+
+        TikaTaskTimeout tikaTaskTimeout = 
parseContext.get(TikaTaskTimeout.class);
+        if (tikaTaskTimeout == null) {
+            return;
+        }
+
+        synchronized (lock) {
+            serverParseTimeoutMillis = tikaTaskTimeout.getTimeoutMillis();
+            LOG.debug("setting per file timeout {}ms", 
serverParseTimeoutMillis);
+        }
+    }
+
     private void actuallyParse(FetchEmitTuple t) {
 
         long start = System.currentTimeMillis();
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 813409efa..650412e15 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.tika.pipes.core;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -27,6 +29,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.xml.sax.SAXException;
 
+import org.apache.tika.config.TikaTaskTimeout;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
@@ -62,9 +65,9 @@ public class PipesClientTest {
                 new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName, 
testPdfFile),
                         new EmitKey(), new Metadata(), new ParseContext(), 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
         Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
-        Assertions.assertEquals(1, 
pipesResult.getEmitData().getMetadataList().size());
+        assertEquals(1, pipesResult.getEmitData().getMetadataList().size());
         Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
-        Assertions.assertEquals("testOverlappingText.pdf", 
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
+        assertEquals("testOverlappingText.pdf", 
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
     }
 
     @Test
@@ -76,9 +79,9 @@ public class PipesClientTest {
                 new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName, 
testPdfFile),
                         new EmitKey(), new Metadata(), parseContext, 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
         Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
-        Assertions.assertEquals(1, 
pipesResult.getEmitData().getMetadataList().size());
+        assertEquals(1, pipesResult.getEmitData().getMetadataList().size());
         Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
-        Assertions.assertEquals("TESTOVERLAPPINGTEXT.PDF", 
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
+        assertEquals("TESTOVERLAPPINGTEXT.PDF", 
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
     }
 
     @Test
@@ -90,8 +93,23 @@ public class PipesClientTest {
                 new FetchEmitTuple("mock/embedded.xml", new 
FetchKey(fetcherName, "mock/embedded.xml"),
                         new EmitKey(), new Metadata(), parseContext, 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
         Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
-        Assertions.assertEquals(5, 
pipesResult.getEmitData().getMetadataList().size());
+        assertEquals(5, pipesResult.getEmitData().getMetadataList().size());
         Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
-        Assertions.assertEquals(4, 
Integer.parseInt(metadata.get("X-TIKA:attachment_count")));
+        assertEquals(4, 
Integer.parseInt(metadata.get("X-TIKA:attachment_count")));
+    }
+
+    @Test
+    public void testTimeout() throws IOException, InterruptedException {
+        //TODO -- add unit test for timeout > default
+        //TODO -- figure out how to test pipes server timeout alone
+        //I did both manually during development, but unit tests are better. :D
+        ParseContext parseContext = new ParseContext();
+        parseContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000));
+        MetadataListFilter metadataFilter = new 
CompositeMetadataListFilter(List.of(new AttachmentCountingListFilter()));
+        parseContext.set(MetadataListFilter.class, metadataFilter);
+        PipesResult pipesResult = pipesClient.process(
+                new FetchEmitTuple("mock/timeout-10s.xml", new 
FetchKey(fetcherName, "mock/timeout-10s.xml"),
+                        new EmitKey(), new Metadata(), parseContext, 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+        assertEquals(PipesResult.TIMEOUT.getStatus(), pipesResult.getStatus());
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml
 
b/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml
new file mode 100644
index 000000000..c041b4169
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<mock>
+
+    <metadata action="add" name="dc:creator">Nikolai Lobachevsky</metadata>
+    <write element="p" times="30"> hello </write>
+    <fakeload millis="10000" cpu="10" mb="10"/>
+</mock>
\ No newline at end of file

Reply via email to