This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new e687ac9 TIKA-3306 -- add timeout in PipesClient
e687ac9 is described below
commit e687ac93073e6a4897486b82f980f9dd144d2c6f
Author: tallison <[email protected]>
AuthorDate: Fri May 14 17:35:52 2021 -0400
TIKA-3306 -- add timeout in PipesClient
---
tika-app/src/main/resources/log4j2.properties | 3 +-
.../main/resources/log4j2_batch_process.properties | 3 +-
.../java/org/apache/tika/pipes/PipesClient.java | 125 ++++++++++++++++-----
.../org/apache/tika/pipes/PipesConfigBase.java | 7 +-
.../java/org/apache/tika/pipes/PipesResult.java | 10 +-
.../java/org/apache/tika/pipes/PipesServer.java | 50 +++++----
.../org/apache/tika/pipes/async/AsyncConfig.java | 6 +-
.../org/apache/tika/pipes/async/AsyncEmitter.java | 22 ++--
.../apache/tika/pipes/async/AsyncProcessor.java | 3 +-
.../tika/pipes/async/AsyncProcessorTest.java | 43 ++++++-
.../resources/tika-config-simple-fs-emitter.xml | 1 +
.../src/main/resources/log4j2.properties | 2 +-
12 files changed, 203 insertions(+), 72 deletions(-)
diff --git a/tika-app/src/main/resources/log4j2.properties
b/tika-app/src/main/resources/log4j2.properties
index af1382f..d17a4a1 100644
--- a/tika-app/src/main/resources/log4j2.properties
+++ b/tika-app/src/main/resources/log4j2.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -21,7 +22,7 @@ appenders=console
appender.console.type=Console
appender.console.name=STDERR
appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%-5p %m%n
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
rootLogger.level=info
rootLogger.appenderRefs=stderr
rootLogger.appenderRef.stderr.ref=STDERR
diff --git a/tika-app/src/main/resources/log4j2_batch_process.properties
b/tika-app/src/main/resources/log4j2_batch_process.properties
index 11461aa..8715133 100644
--- a/tika-app/src/main/resources/log4j2_batch_process.properties
+++ b/tika-app/src/main/resources/log4j2_batch_process.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -22,7 +23,7 @@ appenders=console
appender.console.type=Console
appender.console.name=STDOUT
appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%m%n
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
rootLogger.level=info
rootLogger.appenderRefs=stdout
rootLogger.appenderRef.stdout.ref=STDOUT
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 52d921c..ac46270 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -30,7 +30,12 @@ import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +54,7 @@ public class PipesClient implements Closeable {
private final PipesConfigBase pipesConfig;
private DataOutputStream output;
private DataInputStream input;
+ private final ExecutorService executorService =
Executors.newFixedThreadPool(1);
public PipesClient(PipesConfigBase pipesConfig) {
this.pipesConfig = pipesConfig;
@@ -82,74 +88,108 @@ public class PipesClient implements Closeable {
if (process != null) {
process.destroyForcibly();
}
+ executorService.shutdownNow();
}
public PipesResult process(FetchEmitTuple t) throws IOException {
if (! ping()) {
restart();
}
-
if (pipesConfig.getMaxFilesProcessed() > 0 &&
filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
LOG.info("restarting server after hitting max files: " +
filesProcessed);
restart();
}
- //TODO consider adding a timer here too
- // this could block forever if the watchdog thread in the server fails
- // or is starved
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
- objectOutputStream.writeObject(t);
- }
- byte[] bytes = bos.toByteArray();
- output.write(PipesServer.CALL);
- output.writeInt(bytes.length);
- output.write(bytes);
- output.flush();
+ return actuallyProcess(t);
+ }
+ private PipesResult actuallyProcess(FetchEmitTuple t) {
long start = System.currentTimeMillis();
- try {
- return readResults(t);
- } catch (IOException e) {
+ FutureTask<PipesResult> futureTask = new FutureTask<>(() -> {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
+ objectOutputStream.writeObject(t);
+ }
+ byte[] bytes = bos.toByteArray();
+ output.write(PipesServer.CALL);
+ output.writeInt(bytes.length);
+ output.write(bytes);
+ output.flush();
+
+ return readResults(t, start);
+ });
+ try {
+ executorService.execute(futureTask);
+ return futureTask.get(pipesConfig.getTimeoutMillis(),
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ process.destroyForcibly();
+ return PipesResult.INTERRUPTED_EXCEPTION;
+ } catch (ExecutionException e) {
+ long elapsed = System.currentTimeMillis() - start;
+ destroyWithPause();
+ if (!process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE ==
process.exitValue()) {
+ LOG.warn("server timeout: {} in {} ms", t.getId(), elapsed);
+ return PipesResult.TIMEOUT;
+ }
try {
- process.waitFor(200, TimeUnit.MILLISECONDS);
+ process.waitFor(500, TimeUnit.MILLISECONDS);
+ if (process.isAlive()) {
+ LOG.warn("crash: {} in {} ms with no exit code available",
t.getId(), elapsed);
+ } else {
+ LOG.warn("crash: {} in {} ms with exit code {}",
t.getId(), elapsed, process.exitValue());
+ }
} catch (InterruptedException interruptedException) {
- //wait just a little bit to let process end to get exit value
- } finally {
- process.destroyForcibly();
- }
- if (! process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE ==
process.exitValue()) {
- LOG.warn("{} timed out", t.getId());
- return PipesResult.TIMEOUT;
+ //swallow
}
return PipesResult.UNSPECIFIED_CRASH;
+ } catch (TimeoutException e) {
+ long elapsed = System.currentTimeMillis() - start;
+ process.destroyForcibly();
+ LOG.warn("client timeout: {} in {} ms", t.getId(), elapsed);
+ return PipesResult.TIMEOUT;
+ } finally {
+ futureTask.cancel(true);
+ }
+ }
+
+ private void destroyWithPause() {
+ //wait just a little bit to let process end to get exit value
+ //if there's a timeout on the server side
+ try {
+ process.waitFor(200, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException interruptedException) {
+ //swallow
+ } finally {
+ process.destroyForcibly();
}
+
}
- private PipesResult readResults(FetchEmitTuple t) throws IOException {
+ private PipesResult readResults(FetchEmitTuple t, long start) throws
IOException {
int status = input.read();
+ long millis = System.currentTimeMillis() - start;
switch (status) {
case PipesServer.OOM:
- LOG.warn("oom: " + t.getId());
+ LOG.warn("oom: {} in {} ms", t.getId(), millis);
return PipesResult.OOM;
case PipesServer.TIMEOUT:
- LOG.warn("timeout: " + t.getId());
+ LOG.warn("server response timeout: {} in {} ms", t.getId(),
millis);
return PipesResult.TIMEOUT;
case PipesServer.EMIT_EXCEPTION:
- LOG.warn("emit exception: " + t.getId());
+ LOG.warn("emit exception: {} in {} ms", t.getId(), millis);
return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
case PipesServer.NO_EMITTER_FOUND:
LOG.warn("no emitter found: " + t.getId());
return PipesResult.NO_EMITTER_FOUND;
case PipesServer.PARSE_SUCCESS:
case PipesServer.PARSE_EXCEPTION_EMIT:
- LOG.info("parse success: " + t.getId());
+ LOG.info("parse success: {} in {} ms", t.getId(), millis);
return deserializeEmitData();
case PipesServer.PARSE_EXCEPTION_NO_EMIT:
return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
case PipesServer.EMIT_SUCCESS:
- LOG.info("emit success: " + t.getId());
+ LOG.info("emit success: {} in {} ms", t.getId(), millis);
return PipesResult.EMIT_SUCCESS;
case PipesServer.EMIT_SUCCESS_PARSE_EXCEPTION:
return
readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
@@ -200,6 +240,31 @@ public class PipesClient implements Closeable {
logGobblerThread.start();
input = new DataInputStream(process.getInputStream());
output = new DataOutputStream(process.getOutputStream());
+ //wait for ready signal
+ FutureTask<Integer> futureTask = new FutureTask<>(() -> {
+ int b = input.read();
+ if (b != PipesServer.READY) {
+ throw new RuntimeException("Couldn't start server: " + b);
+ }
+ return 1;
+ });
+ executorService.submit(futureTask);
+ try {
+ futureTask.get(pipesConfig.getStartupTimeoutMillis(),
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ process.destroyForcibly();
+ return;
+ } catch (ExecutionException e) {
+ LOG.error("couldn't start server", e);
+ process.destroyForcibly();
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ LOG.error("couldn't start server in time", e);
+ process.destroyForcibly();
+ throw new RuntimeException(e);
+ } finally {
+ futureTask.cancel(true);
+ }
}
private String[] getCommandline() {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index 8da4f9d..e0ad8d9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -26,7 +26,8 @@ import org.apache.tika.config.ConfigBase;
public class PipesConfigBase extends ConfigBase {
private long timeoutMillis = 30000;
- private long shutdownClientAfterMillis = 1200000;
+ private long startupTimeoutMillis = 240000;
+ private long shutdownClientAfterMillis = 300000;
private int numClients = 10;
private List<String> forkedJvmArgs = new ArrayList<>();
private int maxFilesProcessed = 10000;
@@ -92,4 +93,8 @@ public class PipesConfigBase extends ConfigBase {
public void setJavaPath(String javaPath) {
this.javaPath = javaPath;
}
+
+ public long getStartupTimeoutMillis() {
+ return startupTimeoutMillis;
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
index 0805b03..8023d75 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -26,7 +26,8 @@ public class PipesResult {
PARSE_EXCEPTION_EMIT, PARSE_SUCCESS,
OOM, TIMEOUT, UNSPECIFIED_CRASH,
NO_EMITTER_FOUND,
- EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION
+ EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
+ INTERRUPTED_EXCEPTION
}
public static PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
@@ -36,6 +37,7 @@ public class PipesResult {
public static PipesResult UNSPECIFIED_CRASH = new
PipesResult(STATUS.UNSPECIFIED_CRASH);
public static PipesResult EMIT_SUCCESS = new
PipesResult(STATUS.EMIT_SUCCESS);
public static PipesResult NO_EMITTER_FOUND = new
PipesResult(STATUS.NO_EMITTER_FOUND);
+ public static PipesResult INTERRUPTED_EXCEPTION = new
PipesResult(STATUS.INTERRUPTED_EXCEPTION);
private final STATUS status;
private final EmitData emitData;
private final String message;
@@ -69,4 +71,10 @@ public class PipesResult {
public String getMessage() {
return message;
}
+
+ @Override
+ public String toString() {
+ return "PipesResult{" + "status=" + status + ", emitData=" + emitData
+ ", message='" +
+ message + '\'' + '}';
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 1749837..e251dc8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -56,39 +56,43 @@ import org.apache.tika.utils.StringUtils;
public class PipesServer implements Runnable {
- public static final int TIMEOUT_EXIT_CODE = 3;
+ //this has to be some number not close to 0-3
+ //it looks like the server crashes with exit value 3 on OOM, for example
+ public static final int TIMEOUT_EXIT_CODE = 17;
- public static final byte CALL = 1;
+ public static final byte READY = 1;
- public static final byte PING = 2;
+ public static final byte CALL = 2;
- public static final byte FAILED_TO_START = 3;
+ public static final byte PING = 3;
- public static final byte PARSE_SUCCESS = 4;
+ public static final byte FAILED_TO_START = 4;
+
+ public static final byte PARSE_SUCCESS = 5;
/**
* This will return the parse exception stack trace
*/
- public static final byte PARSE_EXCEPTION_NO_EMIT = 5;
+ public static final byte PARSE_EXCEPTION_NO_EMIT = 6;
/**
* This will return the metadata list
*/
- public static final byte PARSE_EXCEPTION_EMIT = 6;
+ public static final byte PARSE_EXCEPTION_EMIT = 7;
- public static final byte EMIT_SUCCESS = 7;
+ public static final byte EMIT_SUCCESS = 8;
- public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 8;
+ public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 9;
- public static final byte EMIT_EXCEPTION = 9;
+ public static final byte EMIT_EXCEPTION = 10;
- public static final byte NO_EMITTER_FOUND = 10;
+ public static final byte NO_EMITTER_FOUND = 11;
- public static final byte OOM = 11;
+ public static final byte OOM = 12;
- public static final byte TIMEOUT = 12;
+ public static final byte TIMEOUT = 13;
- public static final byte EMPTY_OUTPUT = 13;
+ public static final byte EMPTY_OUTPUT = 14;
private final Object[] lock = new Object[0];
@@ -149,11 +153,12 @@ public class PipesServer implements Runnable {
synchronized (lock) {
long elapsed = System.currentTimeMillis() - since;
if (parsing && elapsed > serverParseTimeoutMillis) {
- System.exit(TIMEOUT_EXIT_CODE);
+ warn("timeout server; elapsed " + elapsed + " with " +
serverParseTimeoutMillis);
+ exit(TIMEOUT_EXIT_CODE);
} else if (!parsing && serverWaitTimeoutMillis > 0 &&
elapsed > serverWaitTimeoutMillis) {
debug("closing down from inactivity");
- System.exit(0);
+ exit(0);
}
}
Thread.sleep(100);
@@ -168,6 +173,11 @@ public class PipesServer implements Runnable {
System.err.flush();
}
+ private void warn(String msg) {
+ System.err.println("warn " + msg);
+ System.err.flush();
+ }
+
private void warn(Throwable t) {
System.err.println("warn " +
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
System.err.flush();
@@ -195,10 +205,12 @@ public class PipesServer implements Runnable {
}
//main loop
try {
+ output.write(READY);
+ output.flush();
while (true) {
int request = input.read();
if (request == -1) {
- break;
+ exit(1);
} else if (request == PING) {
output.writeByte(PING);
output.flush();
@@ -402,11 +414,11 @@ public class PipesServer implements Runnable {
} catch (IOException e) {
err(e);
//LOG.error("problem reading tuple", e);
- System.exit(1);
+ exit(1);
} catch (ClassNotFoundException e) {
err(e);
//LOG.error("can't find class?!", e);
- System.exit(1);
+ exit(1);
}
//unreachable, no?!
return null;
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index 69699d8..fa4a1b0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -31,7 +31,7 @@ public class AsyncConfig extends PipesConfigBase {
private long maxForEmitBatchBytes = 0;
private int queueSize = 10000;
- private final int numEmitters = 1;
+ private int numEmitters = 1;
public static AsyncConfig load(Path p) throws IOException,
TikaConfigException {
AsyncConfig asyncConfig = new AsyncConfig();
@@ -89,6 +89,10 @@ public class AsyncConfig extends PipesConfigBase {
this.maxForEmitBatchBytes = maxForEmitBatchBytes;
}
+ public void setNumEmitters(int numEmitters) {
+ this.numEmitters = numEmitters;
+ }
+
/**
* FetchEmitTuple queue size
* @return
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 15989ac..67a4de7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -47,27 +47,25 @@ public class AsyncEmitter implements Callable<Integer> {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitter.class);
- //TODO -- need to configure these
- private final long emitWithinMs = 1000;
-
- private long maxEstimatedBytes = 10_000_000;
-
+ private final AsyncConfig asyncConfig;
private final EmitterManager emitterManager;
private final ArrayBlockingQueue<EmitData> emitDataQueue;
Instant lastEmitted = Instant.now();
- public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData, EmitterManager
emitterManager) {
+ public AsyncEmitter(AsyncConfig asyncConfig, ArrayBlockingQueue<EmitData>
emitData,
+ EmitterManager emitterManager) {
+ this.asyncConfig = asyncConfig;
this.emitDataQueue = emitData;
this.emitterManager = emitterManager;
}
@Override
public Integer call() throws Exception {
- EmitDataCache cache = new EmitDataCache(maxEstimatedBytes);
+ EmitDataCache cache = new
EmitDataCache(asyncConfig.getEmitMaxEstimatedBytes());
while (true) {
- EmitData emitData = emitDataQueue.poll(100, TimeUnit.MILLISECONDS);
+ EmitData emitData = emitDataQueue.poll(500, TimeUnit.MILLISECONDS);
if (emitData == EMIT_DATA_STOP_SEMAPHORE) {
cache.emitAll();
return EMITTER_FUTURE_CODE;
@@ -80,8 +78,8 @@ public class AsyncEmitter implements Callable<Integer> {
}
LOG.debug("cache size: ({}) bytes and count: {}",
cache.estimatedSize, cache.size);
long elapsed = ChronoUnit.MILLIS.between(lastEmitted,
Instant.now());
- if (elapsed > emitWithinMs) {
- LOG.debug("{} elapsed > {}, going to emitAll", elapsed,
emitWithinMs);
+ if (elapsed > asyncConfig.getEmitWithinMillis()) {
+ LOG.debug("{} elapsed > {}, going to emitAll", elapsed,
asyncConfig.getEmitWithinMillis());
//this can block
cache.emitAll();
}
@@ -118,14 +116,14 @@ public class AsyncEmitter implements Callable<Integer> {
private void emitAll() {
int emitted = 0;
- LOG.debug("about to emit {}", size);
+ LOG.debug("about to emit {} files, {} estimated bytes", size,
estimatedSize);
for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
Emitter emitter = emitterManager.getEmitter(e.getKey());
tryToEmit(emitter, e.getValue());
emitted += e.getValue().size();
}
- LOG.debug("emitted: {}", emitted);
+ LOG.debug("emitted: {} files", emitted);
estimatedSize = 0;
size = 0;
map.clear();
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 467666b..0bdb86f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -75,7 +75,8 @@ public class AsyncProcessor implements Closeable {
EmitterManager emitterManager = EmitterManager.load(tikaConfigPath);
for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
- executorCompletionService.submit(new AsyncEmitter(emitData,
emitterManager));
+ executorCompletionService.submit(new AsyncEmitter(asyncConfig,
emitData,
+ emitterManager));
}
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 4dd8a79..24ed318 100644
---
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -46,12 +46,27 @@ public class AsyncProcessorTest {
"<throw class=\"java.lang.OutOfMemoryError\">oom
message</throw>\n</mock>";
private final String OK = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
"<mock>" +
"<metadata action=\"add\" name=\"dc:creator\">Nikolai
Lobachevsky</metadata>" +
- "<write element=\"p\">main_content</write>" + "</mock>";
+ "<write element=\"p\">main_content</write>" +
+ "</mock>";
+
+ private final String TIMEOUT = "<?xml version=\"1.0\" encoding=\"UTF-8\"
?>" + "<mock>" +
+ "<metadata action=\"add\" name=\"dc:creator\">Nikolai
Lobachevsky</metadata>" +
+ "<write element=\"p\">main_content</write>" +
+ "<fakeload millis=\"60000\" cpu=\"1\" mb=\"10\"/>" + "</mock>";
+
+ private final String SYSTEM_EXIT = "<?xml version=\"1.0\"
encoding=\"UTF-8\" ?>" + "<mock>" +
+ "<metadata action=\"add\" name=\"dc:creator\">Nikolai
Lobachevsky</metadata>" +
+ "<write element=\"p\">main_content</write>" +
+ "<system_exit/>" + "</mock>";
+
private final int totalFiles = 100;
private Path tikaConfigPath;
private Path inputDir;
private int ok = 0;
private int oom = 0;
+ private int timeouts = 0;
+ private int crash = 0;
+
@Before
public void setUp() throws SQLException, IOException {
@@ -68,16 +83,24 @@ public class AsyncProcessorTest {
"</basePath></params>\n" + " </fetcher>" + " </fetchers>" +
"<async><params><tikaConfig>" +
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) +
- "</tikaConfig><forkedJvmArgs><arg>-Xmx256m</arg" +
+ "</tikaConfig><forkedJvmArgs><arg>-Xmx512m</arg" +
"></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" +
- "</params></async>" +
+ "<timeoutMillis>5000</timeoutMillis>" +
+ "<numClients>4</numClients></params></async>" +
"</properties>";
Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
Random r = new Random();
for (int i = 0; i < totalFiles; i++) {
- if (r.nextFloat() < 0.1) {
+ float f = r.nextFloat();
+ if (f < 0.05) {
Files.write(inputDir.resolve(i + ".xml"),
OOM.getBytes(StandardCharsets.UTF_8));
oom++;
+ } else if (f < 0.10) {
+ Files.write(inputDir.resolve(i + ".xml"),
TIMEOUT.getBytes(StandardCharsets.UTF_8));
+ timeouts++;
+ } else if (f < 0.15) {
+ Files.write(inputDir.resolve(i + ".xml"),
SYSTEM_EXIT.getBytes(StandardCharsets.UTF_8));
+ crash++;
} else {
Files.write(inputDir.resolve(i + ".xml"),
OK.getBytes(StandardCharsets.UTF_8));
ok++;
@@ -85,6 +108,18 @@ public class AsyncProcessorTest {
}
}
+/*
+ private void writeLarge(Path resolve) throws IOException {
+ try (BufferedWriter writer = Files.newBufferedWriter(resolve,
StandardCharsets.UTF_8)) {
+ writer.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>");
+ writer.write("<mock>");
+ for (int i = 0; i < 10000000; i++) {
+ writer.write("<write element=\"p\">hello hello hello hello
hello</write>");
+ }
+ writer.write("</mock>");
+ }
+ }
+*/
@After
public void tearDown() throws SQLException, IOException {
Files.delete(tikaConfigPath);
diff --git
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 012d0c6..878db4e 100644
---
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -38,6 +38,7 @@
<metadataFilters>
<metadataFilter
class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
<params>
+ <excludeUnmapped>true</excludeUnmapped>
<mappings>
<mapping from="X-TIKA:content" to="content"/>
<mapping from="X-TIKA:embedded_resource_path" to="embedded_path"/>
diff --git
a/tika-server/tika-server-standard/src/main/resources/log4j2.properties
b/tika-server/tika-server-standard/src/main/resources/log4j2.properties
index f0b7a61..36c879a 100644
--- a/tika-server/tika-server-standard/src/main/resources/log4j2.properties
+++ b/tika-server/tika-server-standard/src/main/resources/log4j2.properties
@@ -23,4 +23,4 @@ appenders=console
appender.console.type=Console
appender.console.name=STDERR
appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%d{ABSOLUTE} %-5p %m%n
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n