Repository: tika
Updated Branches:
  refs/heads/2.x 9a68f4ccc -> 7adfe1cb5


TIKA-2170 allow configuration of timeout for ForkServer


Project: http://git-wip-us.apache.org/repos/asf/tika/repo
Commit: http://git-wip-us.apache.org/repos/asf/tika/commit/7adfe1cb
Tree: http://git-wip-us.apache.org/repos/asf/tika/tree/7adfe1cb
Diff: http://git-wip-us.apache.org/repos/asf/tika/diff/7adfe1cb

Branch: refs/heads/2.x
Commit: 7adfe1cb5490bed1c912c21ccb29f56244485017
Parents: 9a68f4c
Author: tballison <[email protected]>
Authored: Thu Nov 10 09:29:24 2016 -0500
Committer: tballison <[email protected]>
Committed: Thu Nov 10 09:29:24 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/tika/fork/ForkClient.java   |  3 +-
 .../java/org/apache/tika/fork/ForkParser.java   | 16 ++++++-
 .../java/org/apache/tika/fork/ForkServer.java   | 14 ++++--
 .../org/apache/tika/fork/ForkParserTest.java    | 45 +++++++++++++++++++-
 5 files changed, 73 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e062673..ce45ca5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@ Release 2.0 - ???
 
 Release 1.15 -???
 
+  * Allow configuration of timeout for ForkParser (TIKA-2170).
+
   * Add extraction of .jpx inline images from PDFs (TIKA-2175).
 
   * Add .jpx, .jp2, .ppm to formats handled by Tesseract (TIKA-2174).

http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java
----------------------------------------------------------------------
diff --git a/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java 
b/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java
index 1972e91..da14151 100644
--- a/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java
+++ b/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java
@@ -51,7 +51,7 @@ class ForkClient {
 
     private final InputStream error;
 
-    public ForkClient(ClassLoader loader, Object object, List<String> java)
+    public ForkClient(ClassLoader loader, Object object, List<String> java, 
long serverPulseMillis)
             throws IOException, TikaException {
         boolean ok = false;
         try {
@@ -63,6 +63,7 @@ class ForkClient {
             command.addAll(java);
             command.add("-jar");
             command.add(jar.getPath());
+            command.add(Long.toString(serverPulseMillis));
             builder.command(command);
             this.process = builder.start();
 

http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
----------------------------------------------------------------------
diff --git a/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java 
b/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
index 7727fe8..c4fcd39 100644
--- a/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
+++ b/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
@@ -57,6 +57,8 @@ public class ForkParser extends AbstractParser {
     private final Queue<ForkClient> pool =
         new LinkedList<ForkClient>();
 
+    private long serverPulseMillis = 5000;
+
     /**
      * @param loader The ClassLoader to use 
      * @param parser the parser to delegate to. This one cannot be another 
ForkParser
@@ -213,7 +215,7 @@ public class ForkParser extends AbstractParser {
 
             // Create a new process if there's room in the pool
             if (client == null && currentlyInUse < poolSize) {
-                client = new ForkClient(loader, parser, java);
+                client = new ForkClient(loader, parser, java, 
serverPulseMillis);
             }
 
             // Ping the process, and get rid of it if it's inactive
@@ -246,4 +248,16 @@ public class ForkParser extends AbstractParser {
         }
     }
 
+    /**
+     * The amount of time in milliseconds that the server
+     * should wait for any input or output.  If it receives no
+     * input or output in this amount of time, it will shutdown.
+     * The default is 5 seconds.
+     *
+     * @param serverPulseMillis milliseconds to sleep before checking if there 
has been any activity
+     */
+    public void setServerPulseMillis(long serverPulseMillis) {
+        this.serverPulseMillis = serverPulseMillis;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
----------------------------------------------------------------------
diff --git a/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java 
b/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
index 5b60644..68d203d 100644
--- a/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
+++ b/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
@@ -46,6 +46,9 @@ class ForkServer implements Runnable, Checksum {
 
     public static final byte READY = 4;
 
+    //milliseconds to sleep before checking to see if there has been any 
reading/writing
+    //If no reading or writing in this time, shutdown the server.
+    private long serverPulseMillis = 5000;
     /**
      * Starts a forked server process using the standard input and output
      * streams for communication with the parent process. Any attempts by
@@ -56,9 +59,13 @@ class ForkServer implements Runnable, Checksum {
      * @throws Exception if the server could not be started
      */
     public static void main(String[] args) throws Exception {
+        long serverPulseMillis = -1;
+        if (args.length > 0) {
+            serverPulseMillis = Long.parseLong(args[0]);
+        }
         URL.setURLStreamHandlerFactory(new MemoryURLStreamHandlerFactory());
 
-        ForkServer server = new ForkServer(System.in, System.out);
+        ForkServer server = new ForkServer(System.in, System.out, 
serverPulseMillis);
         System.setIn(new ByteArrayInputStream(new byte[0]));
         System.setOut(System.err);
 
@@ -85,19 +92,20 @@ class ForkServer implements Runnable, Checksum {
      * @param output output stream for writing to the parent process
      * @throws IOException if the server instance could not be created
      */
-    public ForkServer(InputStream input, OutputStream output)
+    public ForkServer(InputStream input, OutputStream output, long 
serverPulseMillis)
             throws IOException {
         this.input =
             new DataInputStream(new CheckedInputStream(input, this));
         this.output =
             new DataOutputStream(new CheckedOutputStream(output, this));
+        this.serverPulseMillis = serverPulseMillis;
     }
 
     public void run() {
         try {
             while (active) {
                 active = false;
-                Thread.sleep(5000);
+                Thread.sleep(serverPulseMillis);
             }
             System.exit(0);
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/tika/blob/7adfe1cb/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
----------------------------------------------------------------------
diff --git a/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java 
b/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
index d38a6a6..590aa80 100644
--- a/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
+++ b/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
@@ -16,22 +16,27 @@
  */
 package org.apache.tika.fork;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Semaphore;
 
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.mock.MockParser;
 import org.apache.tika.sax.BodyContentHandler;
 import org.junit.Test;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.helpers.DefaultHandler;
 
-import static org.junit.Assert.assertEquals;
-
 public class ForkParserTest {
 
     @Test
@@ -177,4 +182,40 @@ public class ForkParserTest {
         }
     }
 
+    @Test
+    public void testPulse() throws Exception {
+        //test default 5000 ms
+        ForkParser forkParser = new 
ForkParser(ForkParserTest.class.getClassLoader(), new MockParser());
+        String sleepCommand = "<mock>\n" +
+                "    <write element=\"p\">Hello, World!</write>\n" +
+                "    <hang millis=\"11000\" heavy=\"false\" 
interruptible=\"false\" />\n" +
+                "</mock>";
+        ContentHandler o = new BodyContentHandler(-1);
+        Metadata m = new Metadata();
+        ParseContext c = new ParseContext();
+        try {
+            forkParser.parse(new 
ByteArrayInputStream(sleepCommand.getBytes(StandardCharsets.UTF_8)), o, m, c);
+            fail("should have thrown IOException");
+        } catch (TikaException e) {
+            assertTrue("failed to communicate with forked parser process", 
true);
+        }
+
+        //test setting very short pulse (10 ms) and a parser that takes at 
least 1000 ms
+        forkParser = new ForkParser(ForkParserTest.class.getClassLoader(), new 
MockParser());
+        forkParser.setServerPulseMillis(10);
+        sleepCommand = "<mock>\n" +
+                "    <write element=\"p\">Hello, World!</write>\n" +
+                "    <hang millis=\"1000\" heavy=\"false\" 
interruptible=\"false\" />\n" +
+                "</mock>";
+        o = new BodyContentHandler(-1);
+        m = new Metadata();
+        c = new ParseContext();
+        try {
+            forkParser.parse(new 
ByteArrayInputStream(sleepCommand.getBytes(StandardCharsets.UTF_8)), o, m, c);
+            fail("Should have thrown exception");
+        } catch (IOException e) {
+            assertTrue("should have thrown IOException lost connection", true);
+        }
+    }
+
 }

Reply via email to