This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new e687ac9  TIKA-3306 -- add timeout in PipesClient
e687ac9 is described below

commit e687ac93073e6a4897486b82f980f9dd144d2c6f
Author: tallison <[email protected]>
AuthorDate: Fri May 14 17:35:52 2021 -0400

    TIKA-3306 -- add timeout in PipesClient
---
 tika-app/src/main/resources/log4j2.properties      |   3 +-
 .../main/resources/log4j2_batch_process.properties |   3 +-
 .../java/org/apache/tika/pipes/PipesClient.java    | 125 ++++++++++++++++-----
 .../org/apache/tika/pipes/PipesConfigBase.java     |   7 +-
 .../java/org/apache/tika/pipes/PipesResult.java    |  10 +-
 .../java/org/apache/tika/pipes/PipesServer.java    |  50 +++++----
 .../org/apache/tika/pipes/async/AsyncConfig.java   |   6 +-
 .../org/apache/tika/pipes/async/AsyncEmitter.java  |  22 ++--
 .../apache/tika/pipes/async/AsyncProcessor.java    |   3 +-
 .../tika/pipes/async/AsyncProcessorTest.java       |  43 ++++++-
 .../resources/tika-config-simple-fs-emitter.xml    |   1 +
 .../src/main/resources/log4j2.properties           |   2 +-
 12 files changed, 203 insertions(+), 72 deletions(-)

diff --git a/tika-app/src/main/resources/log4j2.properties 
b/tika-app/src/main/resources/log4j2.properties
index af1382f..d17a4a1 100644
--- a/tika-app/src/main/resources/log4j2.properties
+++ b/tika-app/src/main/resources/log4j2.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -21,7 +22,7 @@ appenders=console
 appender.console.type=Console
 appender.console.name=STDERR
 appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%-5p %m%n
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
 rootLogger.level=info
 rootLogger.appenderRefs=stderr
 rootLogger.appenderRef.stderr.ref=STDERR
diff --git a/tika-app/src/main/resources/log4j2_batch_process.properties 
b/tika-app/src/main/resources/log4j2_batch_process.properties
index 11461aa..8715133 100644
--- a/tika-app/src/main/resources/log4j2_batch_process.properties
+++ b/tika-app/src/main/resources/log4j2_batch_process.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -22,7 +23,7 @@ appenders=console
 appender.console.type=Console
 appender.console.name=STDOUT
 appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%m%n
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
 rootLogger.level=info
 rootLogger.appenderRefs=stdout
 rootLogger.appenderRef.stdout.ref=STDOUT
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 52d921c..ac46270 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -30,7 +30,12 @@ import java.io.ObjectOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +54,7 @@ public class PipesClient implements Closeable {
     private final PipesConfigBase pipesConfig;
     private DataOutputStream output;
     private DataInputStream input;
+    private final ExecutorService executorService = 
Executors.newFixedThreadPool(1);
 
     public PipesClient(PipesConfigBase pipesConfig) {
         this.pipesConfig = pipesConfig;
@@ -82,74 +88,108 @@ public class PipesClient implements Closeable {
         if (process != null) {
             process.destroyForcibly();
         }
+        executorService.shutdownNow();
     }
 
     public PipesResult process(FetchEmitTuple t) throws IOException {
         if (! ping()) {
             restart();
         }
-
         if (pipesConfig.getMaxFilesProcessed() > 0 &&
                 filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
             LOG.info("restarting server after hitting max files: " + 
filesProcessed);
             restart();
         }
-        //TODO consider adding a timer here too
-        // this could block forever if the watchdog thread in the server fails
-        // or is starved
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-            objectOutputStream.writeObject(t);
-        }
-        byte[] bytes = bos.toByteArray();
-        output.write(PipesServer.CALL);
-        output.writeInt(bytes.length);
-        output.write(bytes);
-        output.flush();
+        return actuallyProcess(t);
+    }
 
+    private PipesResult actuallyProcess(FetchEmitTuple t) {
         long start = System.currentTimeMillis();
-        try {
-            return readResults(t);
-        } catch (IOException e) {
+        FutureTask<PipesResult> futureTask = new FutureTask<>(() -> {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
+                objectOutputStream.writeObject(t);
+            }
+            byte[] bytes = bos.toByteArray();
+            output.write(PipesServer.CALL);
+            output.writeInt(bytes.length);
+            output.write(bytes);
+            output.flush();
+
+            return readResults(t, start);
+        });
 
+        try {
+            executorService.execute(futureTask);
+            return futureTask.get(pipesConfig.getTimeoutMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            process.destroyForcibly();
+            return PipesResult.INTERRUPTED_EXCEPTION;
+        } catch (ExecutionException e) {
+            long elapsed = System.currentTimeMillis() - start;
+            destroyWithPause();
+            if (!process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE == 
process.exitValue()) {
+                LOG.warn("server timeout: {} in {} ms", t.getId(), elapsed);
+                return PipesResult.TIMEOUT;
+            }
             try {
-                process.waitFor(200, TimeUnit.MILLISECONDS);
+                process.waitFor(500, TimeUnit.MILLISECONDS);
+                if (process.isAlive()) {
+                    LOG.warn("crash: {} in {} ms with no exit code available", 
t.getId(), elapsed);
+                } else {
+                    LOG.warn("crash: {} in {} ms with exit code {}", 
t.getId(), elapsed, process.exitValue());
+                }
             } catch (InterruptedException interruptedException) {
-                //wait just a little bit to let process end to get exit value
-            } finally {
-                process.destroyForcibly();
-            }
-            if (! process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE == 
process.exitValue()) {
-                LOG.warn("{} timed out", t.getId());
-                return PipesResult.TIMEOUT;
+                //swallow
             }
             return PipesResult.UNSPECIFIED_CRASH;
+        } catch (TimeoutException e) {
+            long elapsed = System.currentTimeMillis() - start;
+            process.destroyForcibly();
+            LOG.warn("client timeout: {} in {} ms", t.getId(), elapsed);
+            return PipesResult.TIMEOUT;
+        } finally {
+            futureTask.cancel(true);
+        }
+    }
+
+    private void destroyWithPause() {
+        //wait just a little bit to let process end to get exit value
+        //if there's a timeout on the server side
+        try {
+            process.waitFor(200, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException interruptedException) {
+            //swallow
+        } finally {
+            process.destroyForcibly();
         }
+
     }
 
-    private PipesResult readResults(FetchEmitTuple t) throws IOException {
+    private PipesResult readResults(FetchEmitTuple t, long start) throws 
IOException {
         int status = input.read();
+        long millis = System.currentTimeMillis() - start;
         switch (status) {
             case PipesServer.OOM:
-                LOG.warn("oom: " + t.getId());
+                LOG.warn("oom: {} in {} ms", t.getId(), millis);
                 return PipesResult.OOM;
             case PipesServer.TIMEOUT:
-                LOG.warn("timeout: " + t.getId());
+                LOG.warn("server response timeout: {} in {} ms", t.getId(), 
millis);
                 return PipesResult.TIMEOUT;
             case PipesServer.EMIT_EXCEPTION:
-                LOG.warn("emit exception: " + t.getId());
+                LOG.warn("emit exception: {} in {} ms", t.getId(), millis);
                 return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
             case PipesServer.NO_EMITTER_FOUND:
                 LOG.warn("no emitter found: " + t.getId());
                 return PipesResult.NO_EMITTER_FOUND;
             case PipesServer.PARSE_SUCCESS:
             case PipesServer.PARSE_EXCEPTION_EMIT:
-                LOG.info("parse success: " + t.getId());
+                LOG.info("parse success: {} in {} ms", t.getId(), millis);
                 return deserializeEmitData();
             case PipesServer.PARSE_EXCEPTION_NO_EMIT:
                 return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
             case PipesServer.EMIT_SUCCESS:
-                LOG.info("emit success: " + t.getId());
+                LOG.info("emit success: {} in {} ms", t.getId(), millis);
                 return PipesResult.EMIT_SUCCESS;
             case PipesServer.EMIT_SUCCESS_PARSE_EXCEPTION:
                 return 
readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
@@ -200,6 +240,31 @@ public class PipesClient implements Closeable {
         logGobblerThread.start();
         input = new DataInputStream(process.getInputStream());
         output = new DataOutputStream(process.getOutputStream());
+        //wait for ready signal
+        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
+            int b = input.read();
+            if (b != PipesServer.READY) {
+                throw new RuntimeException("Couldn't start server: " + b);
+            }
+            return 1;
+        });
+        executorService.submit(futureTask);
+        try {
+            futureTask.get(pipesConfig.getStartupTimeoutMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            process.destroyForcibly();
+            return;
+        } catch (ExecutionException e) {
+            LOG.error("couldn't start server", e);
+            process.destroyForcibly();
+            throw new RuntimeException(e);
+        } catch (TimeoutException e) {
+            LOG.error("couldn't start server in time", e);
+            process.destroyForcibly();
+            throw new RuntimeException(e);
+        } finally {
+            futureTask.cancel(true);
+        }
     }
 
     private String[] getCommandline() {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index 8da4f9d..e0ad8d9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -26,7 +26,8 @@ import org.apache.tika.config.ConfigBase;
 public class PipesConfigBase extends ConfigBase {
 
     private long timeoutMillis = 30000;
-    private long shutdownClientAfterMillis = 1200000;
+    private long startupTimeoutMillis = 240000;
+    private long shutdownClientAfterMillis = 300000;
     private int numClients = 10;
     private List<String> forkedJvmArgs = new ArrayList<>();
     private int maxFilesProcessed = 10000;
@@ -92,4 +93,8 @@ public class PipesConfigBase extends ConfigBase {
     public void setJavaPath(String javaPath) {
         this.javaPath = javaPath;
     }
+
+    public long getStartupTimeoutMillis() {
+        return startupTimeoutMillis;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
index 0805b03..8023d75 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -26,7 +26,8 @@ public class PipesResult {
         PARSE_EXCEPTION_EMIT, PARSE_SUCCESS,
         OOM, TIMEOUT, UNSPECIFIED_CRASH,
         NO_EMITTER_FOUND,
-        EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION
+        EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
+        INTERRUPTED_EXCEPTION
     }
 
     public static PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
@@ -36,6 +37,7 @@ public class PipesResult {
     public static PipesResult UNSPECIFIED_CRASH = new 
PipesResult(STATUS.UNSPECIFIED_CRASH);
     public static PipesResult EMIT_SUCCESS = new 
PipesResult(STATUS.EMIT_SUCCESS);
     public static PipesResult NO_EMITTER_FOUND = new 
PipesResult(STATUS.NO_EMITTER_FOUND);
+    public static PipesResult INTERRUPTED_EXCEPTION = new 
PipesResult(STATUS.INTERRUPTED_EXCEPTION);
     private final STATUS status;
     private final EmitData emitData;
     private final String message;
@@ -69,4 +71,10 @@ public class PipesResult {
     public String getMessage() {
         return message;
     }
+
+    @Override
+    public String toString() {
+        return "PipesResult{" + "status=" + status + ", emitData=" + emitData 
+ ", message='" +
+                message + '\'' + '}';
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 1749837..e251dc8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -56,39 +56,43 @@ import org.apache.tika.utils.StringUtils;
 
 public class PipesServer implements Runnable {
 
-    public static final int TIMEOUT_EXIT_CODE = 3;
+    //this has to be some number not close to 0-3
+    //it looks like the server crashes with exit value 3 on OOM, for example
+    public static final int TIMEOUT_EXIT_CODE = 17;
 
-    public static final byte CALL = 1;
+    public static final byte READY = 1;
 
-    public static final byte PING = 2;
+    public static final byte CALL = 2;
 
-    public static final byte FAILED_TO_START = 3;
+    public static final byte PING = 3;
 
-    public static final byte PARSE_SUCCESS = 4;
+    public static final byte FAILED_TO_START = 4;
+
+    public static final byte PARSE_SUCCESS = 5;
 
     /**
      * This will return the parse exception stack trace
      */
-    public static final byte PARSE_EXCEPTION_NO_EMIT = 5;
+    public static final byte PARSE_EXCEPTION_NO_EMIT = 6;
 
     /**
      * This will return the metadata list
      */
-    public static final byte PARSE_EXCEPTION_EMIT = 6;
+    public static final byte PARSE_EXCEPTION_EMIT = 7;
 
-    public static final byte EMIT_SUCCESS = 7;
+    public static final byte EMIT_SUCCESS = 8;
 
-    public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 8;
+    public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 9;
 
-    public static final byte EMIT_EXCEPTION = 9;
+    public static final byte EMIT_EXCEPTION = 10;
 
-    public static final byte NO_EMITTER_FOUND = 10;
+    public static final byte NO_EMITTER_FOUND = 11;
 
-    public static final byte OOM = 11;
+    public static final byte OOM = 12;
 
-    public static final byte TIMEOUT = 12;
+    public static final byte TIMEOUT = 13;
 
-    public static final byte EMPTY_OUTPUT = 13;
+    public static final byte EMPTY_OUTPUT = 14;
 
 
     private final Object[] lock = new Object[0];
@@ -149,11 +153,12 @@ public class PipesServer implements Runnable {
                 synchronized (lock) {
                     long elapsed = System.currentTimeMillis() - since;
                     if (parsing && elapsed > serverParseTimeoutMillis) {
-                        System.exit(TIMEOUT_EXIT_CODE);
+                        warn("timeout server; elapsed " + elapsed + " with " + 
serverParseTimeoutMillis);
+                        exit(TIMEOUT_EXIT_CODE);
                     } else if (!parsing && serverWaitTimeoutMillis > 0 &&
                             elapsed > serverWaitTimeoutMillis) {
                         debug("closing down from inactivity");
-                        System.exit(0);
+                        exit(0);
                     }
                 }
                 Thread.sleep(100);
@@ -168,6 +173,11 @@ public class PipesServer implements Runnable {
         System.err.flush();
     }
 
+    private void warn(String msg) {
+        System.err.println("warn " + msg);
+        System.err.flush();
+    }
+
     private void warn(Throwable t) {
         System.err.println("warn " + 
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
         System.err.flush();
@@ -195,10 +205,12 @@ public class PipesServer implements Runnable {
         }
         //main loop
         try {
+            output.write(READY);
+            output.flush();
             while (true) {
                 int request = input.read();
                 if (request == -1) {
-                    break;
+                    exit(1);
                 } else if (request == PING) {
                     output.writeByte(PING);
                     output.flush();
@@ -402,11 +414,11 @@ public class PipesServer implements Runnable {
         } catch (IOException e) {
             err(e);
             //LOG.error("problem reading tuple", e);
-            System.exit(1);
+            exit(1);
         } catch (ClassNotFoundException e) {
             err(e);
             //LOG.error("can't find class?!", e);
-            System.exit(1);
+            exit(1);
         }
         //unreachable, no?!
         return null;
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index 69699d8..fa4a1b0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -31,7 +31,7 @@ public class AsyncConfig extends PipesConfigBase {
 
     private long maxForEmitBatchBytes = 0;
     private int queueSize = 10000;
-    private final int numEmitters = 1;
+    private int numEmitters = 1;
 
     public static AsyncConfig load(Path p) throws IOException, 
TikaConfigException {
         AsyncConfig asyncConfig = new AsyncConfig();
@@ -89,6 +89,10 @@ public class AsyncConfig extends PipesConfigBase {
         this.maxForEmitBatchBytes = maxForEmitBatchBytes;
     }
 
+    public void setNumEmitters(int numEmitters) {
+        this.numEmitters = numEmitters;
+    }
+
     /**
      * FetchEmitTuple queue size
      * @return
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 15989ac..67a4de7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -47,27 +47,25 @@ public class AsyncEmitter implements Callable<Integer> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitter.class);
 
-    //TODO -- need to configure these
-    private final long emitWithinMs = 1000;
-
-    private long maxEstimatedBytes = 10_000_000;
-
+    private final AsyncConfig asyncConfig;
     private final EmitterManager emitterManager;
     private final ArrayBlockingQueue<EmitData> emitDataQueue;
 
     Instant lastEmitted = Instant.now();
 
-    public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData, EmitterManager 
emitterManager) {
+    public AsyncEmitter(AsyncConfig asyncConfig, ArrayBlockingQueue<EmitData> 
emitData,
+                        EmitterManager emitterManager) {
+        this.asyncConfig = asyncConfig;
         this.emitDataQueue = emitData;
         this.emitterManager = emitterManager;
     }
 
     @Override
     public Integer call() throws Exception {
-        EmitDataCache cache = new EmitDataCache(maxEstimatedBytes);
+        EmitDataCache cache = new 
EmitDataCache(asyncConfig.getEmitMaxEstimatedBytes());
 
         while (true) {
-            EmitData emitData = emitDataQueue.poll(100, TimeUnit.MILLISECONDS);
+            EmitData emitData = emitDataQueue.poll(500, TimeUnit.MILLISECONDS);
             if (emitData == EMIT_DATA_STOP_SEMAPHORE) {
                 cache.emitAll();
                 return EMITTER_FUTURE_CODE;
@@ -80,8 +78,8 @@ public class AsyncEmitter implements Callable<Integer> {
             }
             LOG.debug("cache size: ({}) bytes and count: {}", 
cache.estimatedSize, cache.size);
             long elapsed = ChronoUnit.MILLIS.between(lastEmitted, 
Instant.now());
-            if (elapsed > emitWithinMs) {
-                LOG.debug("{} elapsed > {}, going to emitAll", elapsed, 
emitWithinMs);
+            if (elapsed > asyncConfig.getEmitWithinMillis()) {
+                LOG.debug("{} elapsed > {}, going to emitAll", elapsed, 
asyncConfig.getEmitWithinMillis());
                 //this can block
                 cache.emitAll();
             }
@@ -118,14 +116,14 @@ public class AsyncEmitter implements Callable<Integer> {
 
         private void emitAll() {
             int emitted = 0;
-            LOG.debug("about to emit {}", size);
+            LOG.debug("about to emit {} files, {} estimated bytes", size, 
estimatedSize);
             for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
                 Emitter emitter = emitterManager.getEmitter(e.getKey());
                 tryToEmit(emitter, e.getValue());
                 emitted += e.getValue().size();
             }
 
-            LOG.debug("emitted: {}", emitted);
+            LOG.debug("emitted: {} files", emitted);
             estimatedSize = 0;
             size = 0;
             map.clear();
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 467666b..0bdb86f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -75,7 +75,8 @@ public class AsyncProcessor implements Closeable {
 
         EmitterManager emitterManager = EmitterManager.load(tikaConfigPath);
         for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
-            executorCompletionService.submit(new AsyncEmitter(emitData, 
emitterManager));
+            executorCompletionService.submit(new AsyncEmitter(asyncConfig, 
emitData,
+                    emitterManager));
         }
     }
 
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java 
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 4dd8a79..24ed318 100644
--- 
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ 
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -46,12 +46,27 @@ public class AsyncProcessorTest {
             "<throw class=\"java.lang.OutOfMemoryError\">oom 
message</throw>\n</mock>";
     private final String OK = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + 
"<mock>" +
             "<metadata action=\"add\" name=\"dc:creator\">Nikolai 
Lobachevsky</metadata>" +
-            "<write element=\"p\">main_content</write>" + "</mock>";
+            "<write element=\"p\">main_content</write>" +
+            "</mock>";
+
+    private final String TIMEOUT = "<?xml version=\"1.0\" encoding=\"UTF-8\" 
?>" + "<mock>" +
+            "<metadata action=\"add\" name=\"dc:creator\">Nikolai 
Lobachevsky</metadata>" +
+            "<write element=\"p\">main_content</write>" +
+            "<fakeload millis=\"60000\" cpu=\"1\" mb=\"10\"/>" + "</mock>";
+
+    private final String SYSTEM_EXIT = "<?xml version=\"1.0\" 
encoding=\"UTF-8\" ?>" + "<mock>" +
+            "<metadata action=\"add\" name=\"dc:creator\">Nikolai 
Lobachevsky</metadata>" +
+            "<write element=\"p\">main_content</write>" +
+            "<system_exit/>" + "</mock>";
+
     private final int totalFiles = 100;
     private Path tikaConfigPath;
     private Path inputDir;
     private int ok = 0;
     private int oom = 0;
+    private int timeouts = 0;
+    private int crash = 0;
+
 
     @Before
     public void setUp() throws SQLException, IOException {
@@ -68,16 +83,24 @@ public class AsyncProcessorTest {
                 "</basePath></params>\n" + "    </fetcher>" + "  </fetchers>" +
                         "<async><params><tikaConfig>" +
                         
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) +
-                        "</tikaConfig><forkedJvmArgs><arg>-Xmx256m</arg" +
+                        "</tikaConfig><forkedJvmArgs><arg>-Xmx512m</arg" +
                         
"></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" +
-                        "</params></async>" +
+                        "<timeoutMillis>5000</timeoutMillis>" +
+                        "<numClients>4</numClients></params></async>" +
                         "</properties>";
         Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
         Random r = new Random();
         for (int i = 0; i < totalFiles; i++) {
-            if (r.nextFloat() < 0.1) {
+            float f = r.nextFloat();
+            if (f < 0.05) {
                 Files.write(inputDir.resolve(i + ".xml"), 
OOM.getBytes(StandardCharsets.UTF_8));
                 oom++;
+            } else if (f < 0.10) {
+                Files.write(inputDir.resolve(i + ".xml"), 
TIMEOUT.getBytes(StandardCharsets.UTF_8));
+                timeouts++;
+            } else if (f < 0.15) {
+                Files.write(inputDir.resolve(i + ".xml"), 
SYSTEM_EXIT.getBytes(StandardCharsets.UTF_8));
+                crash++;
             } else {
                 Files.write(inputDir.resolve(i + ".xml"), 
OK.getBytes(StandardCharsets.UTF_8));
                 ok++;
@@ -85,6 +108,18 @@ public class AsyncProcessorTest {
         }
     }
 
+/*
+    private void writeLarge(Path resolve) throws IOException {
+        try (BufferedWriter writer = Files.newBufferedWriter(resolve, 
StandardCharsets.UTF_8)) {
+            writer.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>");
+            writer.write("<mock>");
+            for (int i = 0; i < 10000000; i++) {
+                writer.write("<write element=\"p\">hello hello hello hello 
hello</write>");
+            }
+            writer.write("</mock>");
+        }
+    }
+*/
     @After
     public void tearDown() throws SQLException, IOException {
         Files.delete(tikaConfigPath);
diff --git 
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
 
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 012d0c6..878db4e 100644
--- 
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++ 
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -38,6 +38,7 @@
   <metadataFilters>
     <metadataFilter 
class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
       <params>
+        <excludeUnmapped>true</excludeUnmapped>
         <mappings>
           <mapping from="X-TIKA:content" to="content"/>
           <mapping from="X-TIKA:embedded_resource_path" to="embedded_path"/>
diff --git 
a/tika-server/tika-server-standard/src/main/resources/log4j2.properties 
b/tika-server/tika-server-standard/src/main/resources/log4j2.properties
index f0b7a61..36c879a 100644
--- a/tika-server/tika-server-standard/src/main/resources/log4j2.properties
+++ b/tika-server/tika-server-standard/src/main/resources/log4j2.properties
@@ -23,4 +23,4 @@ appenders=console
 appender.console.type=Console
 appender.console.name=STDERR
 appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%d{ABSOLUTE} %-5p %m%n
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n

Reply via email to