This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4497 in repository https://gitbox.apache.org/repos/asf/tika.git
commit ea4aae81713a2741ed254c4889110a9b8b59883b Author: tallison <[email protected]> AuthorDate: Wed Oct 1 14:45:05 2025 -0400 TIKA-4497 -- allow per file timeouts in tika-pipes --- CHANGES.txt | 2 ++ .../org/apache/tika/config/TikaTaskTimeout.java | 7 ++++- .../org/apache/tika/pipes/core/PipesClient.java | 4 ++- .../org/apache/tika/pipes/core/PipesServer.java | 34 +++++++++++++++++++--- .../apache/tika/pipes/core/PipesClientTest.java | 30 +++++++++++++++---- .../resources/test-documents/mock/timeout-10s.xml | 27 +++++++++++++++++ 6 files changed, 92 insertions(+), 12 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 865a3bb53..3f04c47eb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,8 @@ Release 4.0.0-BETA1 - ??? Release 3.3.0 - ??? + * Allow per file timeouts in tika-pipes (TIKA-4497). + * Add matroska detector (TIKA-1180). * Allow multiple values for many Dublin Core keys (TIKA-4466). diff --git a/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java b/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java index 821be35e5..842c7a432 100644 --- a/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java +++ b/tika-core/src/main/java/org/apache/tika/config/TikaTaskTimeout.java @@ -16,9 +16,11 @@ */ package org.apache.tika.config; +import java.io.Serializable; + import org.apache.tika.parser.ParseContext; -public class TikaTaskTimeout { +public class TikaTaskTimeout implements Serializable { private final long timeoutMillis; @@ -31,6 +33,9 @@ public class TikaTaskTimeout { } public static long getTimeoutMillis(ParseContext context, long defaultTimeoutMillis) { + if (context == null) { + return defaultTimeoutMillis; + } TikaTaskTimeout tikaTaskTimeout = context.get(TikaTaskTimeout.class); if (tikaTaskTimeout == null) { return defaultTimeoutMillis; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index 2a27320a0..11eb87205 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -47,6 +47,7 @@ import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.tika.config.TikaTaskTimeout; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; @@ -200,7 +201,8 @@ public class PipesClient implements Closeable { ": PipesClient closed"); } executorService.execute(futureTask); - return futureTask.get(pipesConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS); + long timeout = TikaTaskTimeout.getTimeoutMillis(t.getParseContext(), pipesConfig.getTimeoutMillis()); + return futureTask.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { destroyForcibly(); throw e; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java index 2978668c8..d49d95c25 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java @@ -41,6 +41,7 @@ import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; import org.apache.tika.config.TikaConfig; +import org.apache.tika.config.TikaTaskTimeout; import org.apache.tika.detect.Detector; import org.apache.tika.exception.EncryptedDocumentException; import org.apache.tika.exception.TikaConfigException; @@ -135,8 +136,9 @@ public class PipesServer implements Runnable { //if it is smaller than this value, write it back to the //PipesClient so that it can cache the extracts and then batch emit. private final long maxForEmitBatchBytes; - private final long serverParseTimeoutMillis; + private final long defaultServerParseTimeoutMillis; private final long serverWaitTimeoutMillis; + private volatile long serverParseTimeoutMillis; private Parser autoDetectParser; private Parser rMetaParser; private TikaConfig tikaConfig; @@ -154,6 +156,7 @@ public class PipesServer implements Runnable { this.input = new DataInputStream(in); this.output = new DataOutputStream(out); this.maxForEmitBatchBytes = maxForEmitBatchBytes; + this.defaultServerParseTimeoutMillis = serverParseTimeoutMillis; this.serverParseTimeoutMillis = serverParseTimeoutMillis; this.serverWaitTimeoutMillis = serverWaitTimeoutMillis; this.parsing = false; @@ -189,9 +192,9 @@ public class PipesServer implements Runnable { while (true) { synchronized (lock) { long elapsed = System.currentTimeMillis() - since; - if (parsing && elapsed > serverParseTimeoutMillis) { - LOG.warn("timeout server; elapsed {} with {}", elapsed, - serverParseTimeoutMillis); + long timeout = serverParseTimeoutMillis; + if (parsing && elapsed > timeout) { + LOG.warn("timeout server; elapsed {}ms with timeout={}", elapsed, timeout); exit(TIMEOUT_EXIT_CODE); } else if (!parsing && serverWaitTimeoutMillis > 0 && elapsed > serverWaitTimeoutMillis) { @@ -341,6 +344,7 @@ public class PipesServer implements Runnable { System.currentTimeMillis() - start); } start = System.currentTimeMillis(); + updateTimeout(t); actuallyParse(t); if (LOG.isTraceEnabled()) { LOG.trace("timer -- actually parsed: {} ms", System.currentTimeMillis() - start); @@ -351,10 +355,32 @@ public class PipesServer implements Runnable { synchronized (lock) { parsing = false; since = System.currentTimeMillis(); + serverParseTimeoutMillis = defaultServerParseTimeoutMillis; } } } + private void updateTimeout(FetchEmitTuple t) { + if (t == null) { + return; + } + + ParseContext parseContext = t.getParseContext(); + if (parseContext == null) { + return; + } + + TikaTaskTimeout tikaTaskTimeout = parseContext.get(TikaTaskTimeout.class); + if (tikaTaskTimeout == null) { + return; + } + + synchronized (lock) { + serverParseTimeoutMillis = tikaTaskTimeout.getTimeoutMillis(); + LOG.debug("setting per file timeout {}ms", serverParseTimeoutMillis); + } + } + private void actuallyParse(FetchEmitTuple t) { long start = System.currentTimeMillis(); diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java index 813409efa..650412e15 100644 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java @@ -16,6 +16,8 @@ */ package org.apache.tika.pipes.core; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; @@ -27,6 +29,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.xml.sax.SAXException; +import org.apache.tika.config.TikaTaskTimeout; import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; @@ -62,9 +65,9 @@ public class PipesClientTest { new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName, testPdfFile), new EmitKey(), new Metadata(), new ParseContext(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList()); - Assertions.assertEquals(1, pipesResult.getEmitData().getMetadataList().size()); + assertEquals(1, pipesResult.getEmitData().getMetadataList().size()); Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0); - Assertions.assertEquals("testOverlappingText.pdf", metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY)); + assertEquals("testOverlappingText.pdf", metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY)); } @Test @@ -76,9 +79,9 @@ public class PipesClientTest { new FetchEmitTuple(testPdfFile, new FetchKey(fetcherName, testPdfFile), new EmitKey(), new Metadata(), parseContext, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList()); - Assertions.assertEquals(1, pipesResult.getEmitData().getMetadataList().size()); + assertEquals(1, pipesResult.getEmitData().getMetadataList().size()); Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0); - Assertions.assertEquals("TESTOVERLAPPINGTEXT.PDF", metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY)); + assertEquals("TESTOVERLAPPINGTEXT.PDF", metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY)); } @Test @@ -90,8 +93,23 @@ public class PipesClientTest { new FetchEmitTuple("mock/embedded.xml", new FetchKey(fetcherName, "mock/embedded.xml"), new EmitKey(), new Metadata(), parseContext, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList()); - Assertions.assertEquals(5, pipesResult.getEmitData().getMetadataList().size()); + assertEquals(5, pipesResult.getEmitData().getMetadataList().size()); Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0); - Assertions.assertEquals(4, Integer.parseInt(metadata.get("X-TIKA:attachment_count"))); + assertEquals(4, Integer.parseInt(metadata.get("X-TIKA:attachment_count"))); + } + + @Test + public void testTimeout() throws IOException, InterruptedException { + //TODO -- add unit test for timeout > default + //TODO -- figure out how to test pipes server timeout alone + //I did both manually during development, but unit tests are better. :D + ParseContext parseContext = new ParseContext(); + parseContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000)); + MetadataListFilter metadataFilter = new CompositeMetadataListFilter(List.of(new AttachmentCountingListFilter())); + parseContext.set(MetadataListFilter.class, metadataFilter); + PipesResult pipesResult = pipesClient.process( + new FetchEmitTuple("mock/timeout-10s.xml", new FetchKey(fetcherName, "mock/timeout-10s.xml"), + new EmitKey(), new Metadata(), parseContext, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); + assertEquals(PipesResult.TIMEOUT.getStatus(), pipesResult.getStatus()); } } diff --git a/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml b/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml new file mode 100644 index 000000000..c041b4169 --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/test/resources/test-documents/mock/timeout-10s.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8" ?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + + <metadata action="add" name="dc:creator">Nikolai Lobachevsky</metadata> + <write element="p" times="30"> hello </write> + <fakeload millis="10000" cpu="10" mb="10"/> +</mock> \ No newline at end of file
