Repository: tika Updated Branches: refs/heads/2.x 9a68f4ccc -> 7adfe1cb5
TIKA-2170 allow configuration of timeout for ForkServer Project: http://git-wip-us.apache.org/repos/asf/tika/repo Commit: http://git-wip-us.apache.org/repos/asf/tika/commit/7adfe1cb Tree: http://git-wip-us.apache.org/repos/asf/tika/tree/7adfe1cb Diff: http://git-wip-us.apache.org/repos/asf/tika/diff/7adfe1cb Branch: refs/heads/2.x Commit: 7adfe1cb5490bed1c912c21ccb29f56244485017 Parents: 9a68f4c Author: tballison <[email protected]> Authored: Thu Nov 10 09:29:24 2016 -0500 Committer: tballison <[email protected]> Committed: Thu Nov 10 09:29:24 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/tika/fork/ForkClient.java | 3 +- .../java/org/apache/tika/fork/ForkParser.java | 16 ++++++- .../java/org/apache/tika/fork/ForkServer.java | 14 ++++-- .../org/apache/tika/fork/ForkParserTest.java | 45 +++++++++++++++++++- 5 files changed, 73 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e062673..ce45ca5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,8 @@ Release 2.0 - ??? Release 1.15 -??? + * Allow configuration of timeout for ForkParser (TIKA-2170). + * Add extraction of .jpx inline images from PDFs (TIKA-2175). * Add .jpx, .jp2, .ppm to formats handled by Tesseract (TIKA-2174). http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java ---------------------------------------------------------------------- diff --git a/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java b/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java index 1972e91..da14151 100644 --- a/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java +++ b/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java @@ -51,7 +51,7 @@ class ForkClient { private final InputStream error; - public ForkClient(ClassLoader loader, Object object, List<String> java) + public ForkClient(ClassLoader loader, Object object, List<String> java, long serverPulseMillis) throws IOException, TikaException { boolean ok = false; try { @@ -63,6 +63,7 @@ class ForkClient { command.addAll(java); command.add("-jar"); command.add(jar.getPath()); + command.add(Long.toString(serverPulseMillis)); builder.command(command); this.process = builder.start(); http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java ---------------------------------------------------------------------- diff --git a/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java b/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java index 7727fe8..c4fcd39 100644 --- a/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java +++ b/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java @@ -57,6 +57,8 @@ public class ForkParser extends AbstractParser { private final Queue<ForkClient> pool = new LinkedList<ForkClient>(); + private long serverPulseMillis = 5000; + /** * @param loader The ClassLoader to use * @param parser the parser to delegate to. This one cannot be another ForkParser @@ -213,7 +215,7 @@ public class ForkParser extends AbstractParser { // Create a new process if there's room in the pool if (client == null && currentlyInUse < poolSize) { - client = new ForkClient(loader, parser, java); + client = new ForkClient(loader, parser, java, serverPulseMillis); } // Ping the process, and get rid of it if it's inactive @@ -246,4 +248,16 @@ public class ForkParser extends AbstractParser { } } + /** + * The amount of time in milliseconds that the server + * should wait for any input or output. If it receives no + * input or output in this amount of time, it will shutdown. + * The default is 5 seconds. + * + * @param serverPulseMillis milliseconds to sleep before checking if there has been any activity + */ + public void setServerPulseMillis(long serverPulseMillis) { + this.serverPulseMillis = serverPulseMillis; + } + } http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java ---------------------------------------------------------------------- diff --git a/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java b/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java index 5b60644..68d203d 100644 --- a/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java +++ b/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java @@ -46,6 +46,9 @@ class ForkServer implements Runnable, Checksum { public static final byte READY = 4; + //milliseconds to sleep before checking to see if there has been any reading/writing + //If no reading or writing in this time, shutdown the server. + private long serverPulseMillis = 5000; /** * Starts a forked server process using the standard input and output * streams for communication with the parent process. Any attempts by @@ -56,9 +59,13 @@ class ForkServer implements Runnable, Checksum { * @throws Exception if the server could not be started */ public static void main(String[] args) throws Exception { + long serverPulseMillis = -1; + if (args.length > 0) { + serverPulseMillis = Long.parseLong(args[0]); + } URL.setURLStreamHandlerFactory(new MemoryURLStreamHandlerFactory()); - ForkServer server = new ForkServer(System.in, System.out); + ForkServer server = new ForkServer(System.in, System.out, serverPulseMillis); System.setIn(new ByteArrayInputStream(new byte[0])); System.setOut(System.err); @@ -85,19 +92,20 @@ class ForkServer implements Runnable, Checksum { * @param output output stream for writing to the parent process * @throws IOException if the server instance could not be created */ - public ForkServer(InputStream input, OutputStream output) + public ForkServer(InputStream input, OutputStream output, long serverPulseMillis) throws IOException { this.input = new DataInputStream(new CheckedInputStream(input, this)); this.output = new DataOutputStream(new CheckedOutputStream(output, this)); + this.serverPulseMillis = serverPulseMillis; } public void run() { try { while (active) { active = false; - Thread.sleep(5000); + Thread.sleep(serverPulseMillis); } System.exit(0); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java ---------------------------------------------------------------------- diff --git a/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java b/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java index d38a6a6..590aa80 100644 --- a/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java +++ b/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java @@ -16,22 +16,27 @@ */ package org.apache.tika.fork; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Semaphore; +import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.mock.MockParser; import org.apache.tika.sax.BodyContentHandler; import org.junit.Test; import org.xml.sax.ContentHandler; import org.xml.sax.helpers.DefaultHandler; -import static org.junit.Assert.assertEquals; - public class ForkParserTest { @Test @@ -177,4 +182,40 @@ public class ForkParserTest { } } + @Test + public void testPulse() throws Exception { + //test default 5000 ms + ForkParser forkParser = new ForkParser(ForkParserTest.class.getClassLoader(), new MockParser()); + String sleepCommand = "<mock>\n" + + " <write element=\"p\">Hello, World!</write>\n" + + " <hang millis=\"11000\" heavy=\"false\" interruptible=\"false\" />\n" + + "</mock>"; + ContentHandler o = new BodyContentHandler(-1); + Metadata m = new Metadata(); + ParseContext c = new ParseContext(); + try { + forkParser.parse(new ByteArrayInputStream(sleepCommand.getBytes(StandardCharsets.UTF_8)), o, m, c); + fail("should have thrown IOException"); + } catch (TikaException e) { + assertTrue("failed to communicate with forked parser process", true); + } + + //test setting very short pulse (10 ms) and a parser that takes at least 1000 ms + forkParser = new ForkParser(ForkParserTest.class.getClassLoader(), new MockParser()); + forkParser.setServerPulseMillis(10); + sleepCommand = "<mock>\n" + + " <write element=\"p\">Hello, World!</write>\n" + + " <hang millis=\"1000\" heavy=\"false\" interruptible=\"false\" />\n" + + "</mock>"; + o = new BodyContentHandler(-1); + m = new Metadata(); + c = new ParseContext(); + try { + forkParser.parse(new ByteArrayInputStream(sleepCommand.getBytes(StandardCharsets.UTF_8)), o, m, c); + fail("Should have thrown exception"); + } catch (IOException e) { + assertTrue("should have thrown IOException lost connection", true); + } + } + }
