This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 507c9933c TIKA-4497 -- allow per file timeouts in tika-pipes (#2343)
507c9933c is described below
commit 507c9933c3b52f26817872daba082fdebcb41ebb
Author: Tim Allison <[email protected]>
AuthorDate: Wed Oct 1 15:07:23 2025 -0400
TIKA-4497 -- allow per file timeouts in tika-pipes (#2343)
---
CHANGES.txt | 2 ++
.../org/apache/tika/config/TikaTaskTimeout.java | 7 ++++-
.../org/apache/tika/pipes/core/PipesClient.java | 4 ++-
.../org/apache/tika/pipes/core/PipesServer.java | 34 +++++++++++++++++++---
.../apache/tika/pipes/core/PipesClientTest.java | 30 +++++++++++++++----
.../resources/test-documents/mock/timeout-10s.xml | 27 +++++++++++++++++
6 files changed, 92 insertions(+), 12 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 865a3bb53..3f04c47eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Release 4.0.0-BETA1 - ???
Release 3.3.0 - ???
+ * Allow per file timeouts in tika-pipes (TIKA-4497).
+
* Add matroska detector (TIKA-1180).
* Allow multiple values for many Dublin Core keys (TIKA-4466).
diff --git
a/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java
b/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java
index 821be35e5..842c7a432 100644
--- a/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java
+++ b/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java
@@ -16,9 +16,11 @@
*/
package org.apache.tika.config;
+import java.io.Serializable;
+
import org.apache.tika.parser.ParseContext;
-public class TikaTaskTimeout {
+public class TikaTaskTimeout implements Serializable {
private final long timeoutMillis;
@@ -31,6 +33,9 @@ public class TikaTaskTimeout {
}
public static long getTimeoutMillis(ParseContext context, long
defaultTimeoutMillis) {
+ if (context == null) {
+ return defaultTimeoutMillis;
+ }
TikaTaskTimeout tikaTaskTimeout = context.get(TikaTaskTimeout.class);
if (tikaTaskTimeout == null) {
return defaultTimeoutMillis;
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 2a27320a0..11eb87205 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -47,6 +47,7 @@ import
org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tika.config.TikaTaskTimeout;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.parser.ParseContext;
@@ -200,7 +201,8 @@ public class PipesClient implements Closeable {
": PipesClient closed");
}
executorService.execute(futureTask);
- return futureTask.get(pipesConfig.getTimeoutMillis(),
TimeUnit.MILLISECONDS);
+ long timeout =
TikaTaskTimeout.getTimeoutMillis(t.getParseContext(),
pipesConfig.getTimeoutMillis());
+ return futureTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
destroyForcibly();
throw e;
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
index 2978668c8..d49d95c25 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
@@ -41,6 +41,7 @@ import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.apache.tika.config.TikaConfig;
+import org.apache.tika.config.TikaTaskTimeout;
import org.apache.tika.detect.Detector;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.exception.TikaConfigException;
@@ -135,8 +136,9 @@ public class PipesServer implements Runnable {
//if it is smaller than this value, write it back to the
//PipesClient so that it can cache the extracts and then batch emit.
private final long maxForEmitBatchBytes;
- private final long serverParseTimeoutMillis;
+ private final long defaultServerParseTimeoutMillis;
private final long serverWaitTimeoutMillis;
+ private volatile long serverParseTimeoutMillis;
private Parser autoDetectParser;
private Parser rMetaParser;
private TikaConfig tikaConfig;
@@ -154,6 +156,7 @@ public class PipesServer implements Runnable {
this.input = new DataInputStream(in);
this.output = new DataOutputStream(out);
this.maxForEmitBatchBytes = maxForEmitBatchBytes;
+ this.defaultServerParseTimeoutMillis = serverParseTimeoutMillis;
this.serverParseTimeoutMillis = serverParseTimeoutMillis;
this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
this.parsing = false;
@@ -189,9 +192,9 @@ public class PipesServer implements Runnable {
while (true) {
synchronized (lock) {
long elapsed = System.currentTimeMillis() - since;
- if (parsing && elapsed > serverParseTimeoutMillis) {
- LOG.warn("timeout server; elapsed {} with {}",
elapsed,
- serverParseTimeoutMillis);
+ long timeout = serverParseTimeoutMillis;
+ if (parsing && elapsed > timeout) {
+ LOG.warn("timeout server; elapsed {}ms with
timeout={}", elapsed, timeout);
exit(TIMEOUT_EXIT_CODE);
} else if (!parsing && serverWaitTimeoutMillis > 0 &&
elapsed > serverWaitTimeoutMillis) {
@@ -341,6 +344,7 @@ public class PipesServer implements Runnable {
System.currentTimeMillis() - start);
}
start = System.currentTimeMillis();
+ updateTimeout(t);
actuallyParse(t);
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- actually parsed: {} ms",
System.currentTimeMillis() - start);
@@ -351,10 +355,32 @@ public class PipesServer implements Runnable {
synchronized (lock) {
parsing = false;
since = System.currentTimeMillis();
+ serverParseTimeoutMillis = defaultServerParseTimeoutMillis;
}
}
}
+ private void updateTimeout(FetchEmitTuple t) {
+ if (t == null) {
+ return;
+ }
+
+ ParseContext parseContext = t.getParseContext();
+ if (parseContext == null) {
+ return;
+ }
+
+ TikaTaskTimeout tikaTaskTimeout =
parseContext.get(TikaTaskTimeout.class);
+ if (tikaTaskTimeout == null) {
+ return;
+ }
+
+ synchronized (lock) {
+ serverParseTimeoutMillis = tikaTaskTimeout.getTimeoutMillis();
+ LOG.debug("setting per file timeout {}ms",
serverParseTimeoutMillis);
+ }
+ }
+
private void actuallyParse(FetchEmitTuple t) {
long start = System.currentTimeMillis();
diff --git
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 813409efa..650412e15 100644
---
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.tika.pipes.core;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -27,6 +29,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.xml.sax.SAXException;
+import org.apache.tika.config.TikaTaskTimeout;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
@@ -62,9 +65,9 @@ public class PipesClientTest {
new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName,
testPdfFile),
new EmitKey(), new Metadata(), new ParseContext(),
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
- Assertions.assertEquals(1,
pipesResult.getEmitData().getMetadataList().size());
+ assertEquals(1, pipesResult.getEmitData().getMetadataList().size());
Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
- Assertions.assertEquals("testOverlappingText.pdf",
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
+ assertEquals("testOverlappingText.pdf",
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
}
@Test
@@ -76,9 +79,9 @@ public class PipesClientTest {
new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName,
testPdfFile),
new EmitKey(), new Metadata(), parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
- Assertions.assertEquals(1,
pipesResult.getEmitData().getMetadataList().size());
+ assertEquals(1, pipesResult.getEmitData().getMetadataList().size());
Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
- Assertions.assertEquals("TESTOVERLAPPINGTEXT.PDF",
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
+ assertEquals("TESTOVERLAPPINGTEXT.PDF",
metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY));
}
@Test
@@ -90,8 +93,23 @@ public class PipesClientTest {
new FetchEmitTuple("mock/embedded.xml", new
FetchKey(fetcherName, "mock/embedded.xml"),
new EmitKey(), new Metadata(), parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
- Assertions.assertEquals(5,
pipesResult.getEmitData().getMetadataList().size());
+ assertEquals(5, pipesResult.getEmitData().getMetadataList().size());
Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
- Assertions.assertEquals(4,
Integer.parseInt(metadata.get("X-TIKA:attachment_count")));
+ assertEquals(4,
Integer.parseInt(metadata.get("X-TIKA:attachment_count")));
+ }
+
+ @Test
+ public void testTimeout() throws IOException, InterruptedException {
+ //TODO -- add unit test for timeout > default
+ //TODO -- figure out how to test pipes server timeout alone
+ //I did both manually during development, but unit tests are better. :D
+ ParseContext parseContext = new ParseContext();
+ parseContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000));
+ MetadataListFilter metadataFilter = new
CompositeMetadataListFilter(List.of(new AttachmentCountingListFilter()));
+ parseContext.set(MetadataListFilter.class, metadataFilter);
+ PipesResult pipesResult = pipesClient.process(
+ new FetchEmitTuple("mock/timeout-10s.xml", new
FetchKey(fetcherName, "mock/timeout-10s.xml"),
+ new EmitKey(), new Metadata(), parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+ assertEquals(PipesResult.TIMEOUT.getStatus(), pipesResult.getStatus());
}
}
diff --git
a/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml
b/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml
new file mode 100644
index 000000000..c041b4169
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<mock>
+
+ <metadata action="add" name="dc:creator">Nikolai Lobachevsky</metadata>
+ <write element="p" times="30"> hello </write>
+ <fakeload millis="10000" cpu="10" mb="10"/>
+</mock>
\ No newline at end of file