This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch branch_1x in repository https://gitbox.apache.org/repos/asf/tika.git
commit e7cef357c662a5a52d84c5254c633b9679bf5ff0 Author: TALLISON <[email protected]> AuthorDate: Tue Sep 11 16:16:34 2018 -0400 TIKA-2725 -- first working draft...ready for commit and future cleanups # Conflicts: # tika-server/src/main/java/org/apache/tika/server/resource/DetectorResource.java --- .../tika/server/FileCountExceededException.java | 9 - .../java/org/apache/tika/server/ServerStatus.java | 80 +++--- .../apache/tika/server/ServerStatusWatcher.java | 134 ++++++++-- .../org/apache/tika/server/ServerTimeouts.java | 106 ++++++++ .../java/org/apache/tika/server/TikaServerCli.java | 199 +++++++-------- .../org/apache/tika/server/TikaServerWatchDog.java | 233 +++++++++++++++++ .../tika/server/resource/DetectorResource.java | 13 + .../apache/tika/server/resource/TikaResource.java | 23 +- .../apache/tika/server/resource/TikaVersion.java | 1 + .../apache/tika/server/resource/TikaWelcome.java | 5 + .../tika/server/resource/TranslateResource.java | 19 +- .../java/org/apache/tika/server/CXFTestBase.java | 2 +- .../apache/tika/server/DetectorResourceTest.java | 2 +- .../apache/tika/server/ServerIntegrationTest.java | 73 ------ .../org/apache/tika/server/ServerStatusTest.java | 16 +- .../org/apache/tika/server/StackTraceOffTest.java | 2 +- .../org/apache/tika/server/StackTraceTest.java | 2 +- .../tika/server/TikaServerIntegrationTest.java | 276 +++++++++++++++++++++ .../org/apache/tika/server/TikaWelcomeTest.java | 2 +- .../apache/tika/server/TranslateResourceTest.java | 2 +- .../src/test/resources/mock/heavy_hand_100.xml | 25 ++ .../src/test/resources/mock/heavy_hang_30000.xml | 25 ++ tika-server/src/test/resources/mock/real_oom.xml | 24 ++ .../src/test/resources/mock/system_exit.xml | 25 ++ .../src/test/resources/mock/thread_interrupt.xml | 25 ++ 25 files changed, 1065 insertions(+), 258 deletions(-) diff --git a/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java b/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java deleted file mode 100644 index 9920556..0000000 --- a/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.apache.tika.server; - -/** - * Exception thrown by ServerStatusWatcher if tika-server exceeds - * the maximum number of files to process. - */ -public class FileCountExceededException extends Exception { - -} diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java index 861007d..ac5fed4 100644 --- a/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java +++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java @@ -16,53 +16,72 @@ */ package org.apache.tika.server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class ServerStatus { - enum STATUS { - OPEN(0), + enum DIRECTIVES { + PING((byte)0), + PING_ACTIVE_SERVER_TASKS((byte)1), + SHUTDOWN((byte)2); + + private final byte b; + DIRECTIVES(byte b) { + this.b = b; + } + byte getByte() { return b;} + } + + public enum STATUS { + OPERATING(0), HIT_MAX(1), TIMEOUT(2), ERROR(3), - PARENT_REQUESTED_SHUTDOWN(4); + PARENT_REQUESTED_SHUTDOWN(4), + PARENT_EXCEPTION(5); private final int shutdownCode; + + static STATUS lookup(int i) { + STATUS[] values = STATUS.values(); + if (i < 0 || i >= values.length) { + throw new ArrayIndexOutOfBoundsException(i + + " is not acceptable for an array of length "+values.length); + } + return STATUS.values()[i]; + } + STATUS(int shutdownCode) { this.shutdownCode = shutdownCode; } int getShutdownCode() { return shutdownCode; } + byte getByte() { return (byte) shutdownCode;} + } - enum TASK { + public enum TASK { PARSE, - UNZIP, DETECT, - METADATA + TRANSLATE }; + private static final Logger LOG = LoggerFactory.getLogger(ServerStatus.class); - private final int maxFilesToProcess; - private AtomicInteger counter = new AtomicInteger(0); - private Map<Integer, TaskStatus> tasks = new HashMap<>(); + private AtomicLong counter = new AtomicLong(0); + private Map<Long, TaskStatus> tasks = new HashMap<>(); - private STATUS status = STATUS.OPEN; - public ServerStatus(int maxFilesToProcess) { - this.maxFilesToProcess = maxFilesToProcess; - } - public synchronized int start(TASK task, String fileName) throws FileCountExceededException { - int i = counter.incrementAndGet(); - if (i == Integer.MAX_VALUE || - (maxFilesToProcess > 0 && i >= maxFilesToProcess)) { - setStatus(STATUS.HIT_MAX); - throw new FileCountExceededException(); - } - tasks.put(i, new TaskStatus(task, Instant.now(), fileName)); - return i; + private STATUS status = STATUS.OPERATING; + + public synchronized long start(TASK task, String fileName) { + long taskId = counter.incrementAndGet(); + tasks.put(taskId, new TaskStatus(task, Instant.now(), fileName)); + return taskId; } /** @@ -71,7 +90,7 @@ public class ServerStatus { * @param taskId * @throws IllegalArgumentException if there is no task by that taskId in the collection */ - public synchronized void complete(int taskId) throws IllegalArgumentException { + public synchronized void complete(long taskId) throws IllegalArgumentException { TaskStatus status = tasks.remove(taskId); if (status == null) { throw new IllegalArgumentException("TaskId is not in map:"+taskId); @@ -86,13 +105,18 @@ public class ServerStatus { return status; } - public synchronized Map<Integer, TaskStatus> getTasks() { - Map<Integer, TaskStatus> ret = new HashMap<>(); + public synchronized Map<Long, TaskStatus> getTasks() { + Map<Long, TaskStatus> ret = new HashMap<>(); ret.putAll(tasks); return ret; } - public synchronized int getFilesProcessed() { + public synchronized long getFilesProcessed() { return counter.get(); } + + public synchronized boolean isOperating() { + return status == STATUS.OPERATING; + } + } diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java index 24b1ddb..8023e94 100644 --- a/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java +++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java @@ -17,51 +17,110 @@ package org.apache.tika.server; -import org.apache.tika.server.resource.TranslateResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.InputStream; +import java.io.OutputStream; import java.time.Duration; import java.time.Instant; -import java.util.concurrent.Callable; public class ServerStatusWatcher implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ServerStatusWatcher.class); - private final ServerStatus serverStatus; - private final long timeoutMillis; - private final long pulseMillis; + private final DataInputStream fromParent; + private final DataOutputStream toParent; + private final long maxFiles; + private final ServerTimeouts serverTimeouts; + - public ServerStatusWatcher(ServerStatus serverStatus, long timeoutMillis, long pulseMillis) { + private volatile Instant lastPing = null; + + public ServerStatusWatcher(ServerStatus serverStatus, + InputStream inputStream, OutputStream outputStream, + long maxFiles, + ServerTimeouts serverTimeouts) { this.serverStatus = serverStatus; - this.timeoutMillis = timeoutMillis; - this.pulseMillis = pulseMillis; + this.maxFiles = maxFiles; + this.serverTimeouts = serverTimeouts; + + this.fromParent = new DataInputStream(inputStream); + this.toParent = new DataOutputStream(outputStream); + Thread statusWatcher = new Thread(new StatusWatcher()); + statusWatcher.setDaemon(true); + statusWatcher.start(); } @Override public void run() { - ServerStatus.STATUS status = serverStatus.getStatus(); - while (status.equals(ServerStatus.STATUS.OPEN)) { + //let parent know child is alive + try { + toParent.writeByte(ServerStatus.STATUS.OPERATING.getByte()); + toParent.flush(); + } catch (Exception e) { + LOG.warn("Exception writing startup ping to parent", e); + serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION); + shutdown(ServerStatus.STATUS.PARENT_EXCEPTION); + } + + byte directive = (byte)-1; + while (true) { try { - Thread.sleep(pulseMillis); - } catch (InterruptedException e) { + directive = fromParent.readByte(); + lastPing = Instant.now(); + } catch (Exception e) { + LOG.warn("Exception reading from parent", e); + serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION); + shutdown(ServerStatus.STATUS.PARENT_EXCEPTION); } - checkForTimeouts(); - status = serverStatus.getStatus(); + if (directive == ServerStatus.DIRECTIVES.PING.getByte()) { + if (serverStatus.getStatus().equals(ServerStatus.STATUS.OPERATING)) { + checkForHitMaxFiles(); + checkForTaskTimeouts(); + } + try { + toParent.writeByte(serverStatus.getStatus().getByte()); + toParent.flush(); + } catch (Exception e) { + LOG.warn("Exception writing to parent", e); + serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION); + shutdown(ServerStatus.STATUS.PARENT_EXCEPTION); + } + } else if (directive == ServerStatus.DIRECTIVES.SHUTDOWN.getByte()) { + LOG.info("Parent requested shutdown"); + serverStatus.setStatus(ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN); + shutdown(ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN); + } else if (directive == ServerStatus.DIRECTIVES.PING_ACTIVE_SERVER_TASKS.getByte()) { try { + toParent.writeInt(serverStatus.getTasks().size()); + toParent.flush(); + } catch (Exception e) { + LOG.warn("Exception writing to parent", e); + serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION); + shutdown(ServerStatus.STATUS.PARENT_EXCEPTION); + } + } + } + } + + private void checkForHitMaxFiles() { + if (maxFiles < 0) { + return; } - if (! status.equals(ServerStatus.STATUS.OPEN)) { - LOG.warn("child process shutting down with status: {}", status); - System.exit(status.getShutdownCode()); + long filesProcessed = serverStatus.getFilesProcessed(); + if (filesProcessed >= maxFiles) { + serverStatus.setStatus(ServerStatus.STATUS.HIT_MAX); } } - private void checkForTimeouts() { + private void checkForTaskTimeouts() { Instant now = Instant.now(); for (TaskStatus status : serverStatus.getTasks().values()) { - long millisElapsed = Duration.between(now, status.started).toMillis(); - if (millisElapsed > timeoutMillis) { + long millisElapsed = Duration.between(status.started, now).toMillis(); + if (millisElapsed > serverTimeouts.getTaskTimeoutMillis()) { serverStatus.setStatus(ServerStatus.STATUS.TIMEOUT); if (status.fileName.isPresent()) { LOG.error("Timeout task {}, millis elapsed {}, file {}", @@ -73,4 +132,39 @@ public class ServerStatusWatcher implements Runnable { } } } + + private void shutdown(ServerStatus.STATUS status) { + LOG.info("Shutting down child process with status: " +status.name()); + System.exit(status.getShutdownCode()); + } + + //This is an internal thread that pulses every 100MS + //within the child to see if the child should die. + private class StatusWatcher implements Runnable { + + @Override + public void run() { + while (true) { + ServerStatus.STATUS currStatus = serverStatus.getStatus(); + + if (currStatus != ServerStatus.STATUS.OPERATING) { + LOG.warn("child process observed "+currStatus.name()+ " and is shutting down."); + shutdown(currStatus); + } + + if (lastPing != null) { + long elapsed = Duration.between(lastPing, Instant.now()).toMillis(); + if (elapsed > serverTimeouts.getPingTimeoutMillis()) { + serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION); + shutdown(ServerStatus.STATUS.PARENT_EXCEPTION); + } + } + try { + Thread.sleep(serverTimeouts.getPingPulseMillis()); + } catch (InterruptedException e) { + return; + } + } + } + } } \ No newline at end of file diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerTimeouts.java b/tika-server/src/main/java/org/apache/tika/server/ServerTimeouts.java new file mode 100644 index 0000000..b85d89c --- /dev/null +++ b/tika-server/src/main/java/org/apache/tika/server/ServerTimeouts.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.server; + +public class ServerTimeouts { + + /* + TODO: integrate these settings: + * Number of milliseconds to wait to start child process. + public static final long DEFAULT_CHILD_PROCESS_STARTUP_MILLIS = 60000; + + * Maximum number of milliseconds to wait to shutdown child process to allow + * for current parses to complete. + public static final long DEFAULT_CHILD_PROCESS_SHUTDOWN_MILLIS = 30000; + + private long childProcessStartupMillis = DEFAULT_CHILD_PROCESS_STARTUP_MILLIS; + + private long childProcessShutdownMillis = DEFAULT_CHILD_PROCESS_SHUTDOWN_MILLIS; + + */ + + + + /** + * If the child doesn't receive a ping or the parent doesn't + * hear back from a ping in this amount of time, kill and restart the child. + */ + public static final long DEFAULT_PING_TIMEOUT_MILLIS = 30000; + + /** + * How often should the parent try to ping the child to check status + */ + public static final long DEFAULT_PING_PULSE_MILLIS = 500; + + /** + * Number of milliseconds to wait per server task (parse, detect, unpack, translate, + * etc.) before timing out and shutting down the child process. + */ + public static final long DEFAULT_TASK_TIMEOUT_MILLIS = 120000; + + private long taskTimeoutMillis = DEFAULT_TASK_TIMEOUT_MILLIS; + + private long pingTimeoutMillis = DEFAULT_PING_TIMEOUT_MILLIS; + + private long pingPulseMillis = DEFAULT_PING_PULSE_MILLIS; + + + /** + * How long to wait for a task before shutting down the child server process + * and restarting it. + * @return + */ + public long getTaskTimeoutMillis() { + return taskTimeoutMillis; + } + + /** + * + * @param taskTimeoutMillis number of milliseconds to allow per task + * (parse, detection, unzipping, etc.) + */ + public void setTaskTimeoutMillis(long taskTimeoutMillis) { + this.taskTimeoutMillis = taskTimeoutMillis; + } + + public long getPingTimeoutMillis() { + return pingTimeoutMillis; + } + + /** + * + * @param pingTimeoutMillis if the parent doesn't receive a response + * in this amount of time, or + * if the child doesn't receive a ping + * in this amount of time, restart the child process + */ + public void setPingTimeoutMillis(long pingTimeoutMillis) { + this.pingTimeoutMillis = pingTimeoutMillis; + } + + public long getPingPulseMillis() { + return pingPulseMillis; + } + + /** + * + * @param pingPulseMillis how often to test that the parent and/or child is alive + */ + public void setPingPulseMillis(long pingPulseMillis) { + this.pingPulseMillis = pingPulseMillis; + } +} diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java index af8fd8f..e3242d1 100644 --- a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java +++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java @@ -17,7 +17,7 @@ package org.apache.tika.server; -import java.io.IOException; +import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -66,11 +66,7 @@ public class TikaServerCli { //used in spawn-child mode - private static final long PULSE_MILLIS = 100; - private static final int DEFAULT_MAX_FILES = -1; - private static final long DEFAULT_TIME_OUT_MS = 60000; - private static final long DEFAULT_PULSE_MS = 500; - private static Thread SHUTDOWN_HOOK = null; + private static final long DEFAULT_MAX_FILES = 100000; public static final int DEFAULT_PORT = 9998; @@ -86,6 +82,10 @@ public class TikaServerCli { "drive or a webpage from your intranet. See CVE-2015-3271.\n"+ "Please make sure you know what you are doing."; + private static final List<String> ONLY_IN_SPAWN_CHILD_MODE = + Arrays.asList(new String[] { "taskTimeoutMillis", "taskPulseMillis", + "pingTimeoutMillis", "pingPulseMillis", "maxFiles"}); + private static Options getOptions() { Options options = new Options(); options.addOption("C", "cors", true, "origin allowed to make CORS requests (default=NONE)\nall allowed if \"all\""); @@ -100,7 +100,14 @@ public class TikaServerCli { options.addOption("enableUnsecureFeatures", false, "this is required to enable fileUrl."); options.addOption("enableFileUrl", false, "allows user to pass in fileUrl instead of InputStream."); options.addOption("spawnChild", false, "whether or not to spawn a child process for robustness"); - options.addOption("maxFiles", false, "shutdown server after this many files -- use only in 'spawnChild' mode"); + options.addOption("taskTimeoutMillis", true, "Only in spawn child mode: how long to wait for a task (e.g. parse) to finish"); + options.addOption("taskPulseMillis", true, "Only in spawn child mode: how often to check if a task has timed out."); + options.addOption("pingTimeoutMillis", true, "Only in spawn child mode: how long to wait to wait for a ping and/or ping response."); + options.addOption("pingPulseMillis", true, "Only in spawn child mode: how often to check if a ping has timed out."); + + options.addOption("maxFiles", false, "Only in spawn child mode: shutdown server after this many files -- use only in 'spawnChild' mode"); + options.addOption("child", false, "this process is a child process -- EXPERT -- " + + "should normally only be invoked by parent process"); return options; } @@ -116,106 +123,45 @@ public class TikaServerCli { } private static void execute(String[] args) throws Exception { - boolean spawnChild = false; - for (int i = 0; i < args.length; i++) { - if ("-spawnChild".equals(args[i]) || "--spawnChild".equals(args[i])) { - spawnChild = true; - break; - } - } - if (spawnChild) { - spawnChild(args); - } else { - executeLegacy(args); - } - } + Options options = getOptions(); - private static void spawnChild(String[] args) throws Exception { - Process child = start(args); - try { - while (true) { - Thread.sleep(PULSE_MILLIS); + CommandLineParser cliParser = new GnuParser(); - int exitValue = Integer.MAX_VALUE; - try { - exitValue = child.exitValue(); - } catch (IllegalThreadStateException e) { - //process is still running - } - if (exitValue != Integer.MAX_VALUE) { - if (exitValue != ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN.getShutdownCode()) { - LOG.warn("child exited with code ({}) -- restarting, now", Integer.toString(exitValue)); - child.destroyForcibly(); - child = start(args); + //need to strip out -J (child jvm opts) from this parse + //they'll be processed correctly in args in the watch dog + //and they won't be needed in legacy. + CommandLine line = cliParser.parse(options, stripChildArgs(args)); + if (line.hasOption("spawnChild")) { + TikaServerWatchDog watchDog = new TikaServerWatchDog(); + watchDog.execute(args, configureServerTimeouts(line)); + } else { + if (! line.hasOption("child")) { + //make sure the user didn't misunderstand the options + for (String childOnly : ONLY_IN_SPAWN_CHILD_MODE) { + if (line.hasOption(childOnly)) { + System.err.println("The option '" + childOnly + + "' can only be used with '-spawnChild'"); + usage(options); } } } - } catch (InterruptedException e) { - //interrupted...shutting down - } finally { - child.destroyForcibly(); - } - } - - private static Process start(String[] args) throws IOException { - ProcessBuilder builder = new ProcessBuilder(); - builder.inheritIO(); - List<String> argList = new ArrayList<>(); - List<String> jvmArgs = extractJVMArgs(args); - List<String> childArgs = extractArgs(args); - argList.add("java"); - if (! jvmArgs.contains("-cp") && ! jvmArgs.contains("--classpath")) { - String cp = System.getProperty("java.class.path"); - jvmArgs.add("-cp"); - jvmArgs.add(cp); - } - argList.addAll(jvmArgs); - argList.add("org.apache.tika.server.TikaServerCli"); - argList.addAll(childArgs); - - builder.command(argList); - - Process process = builder.start(); - - if (SHUTDOWN_HOOK != null) { - Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK); + executeLegacy(line, options); } - SHUTDOWN_HOOK = new Thread(() -> process.destroy()); - Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK); - return process; } - private static List<String> extractArgs(String[] args) { - List<String> argList = new ArrayList<>(); + private static String[] stripChildArgs(String[] args) { + List<String> ret = new ArrayList<>(); for (int i = 0; i < args.length; i++) { - if (args[i].startsWith("-J") || args[i].equals("-spawnChild") || args[i].equals("--spawnChild")) { - continue; + if (! args[i].startsWith("-J")) { + ret.add(args[i]); } - argList.add(args[i]); } - return argList; + return ret.toArray(new String[ret.size()]); } - private static List<String> extractJVMArgs(String[] args) { - List<String> jvmArgs = new ArrayList<>(); - for (int i = 0; i < args.length; i++) { - if (args[i].startsWith("-J")) { - jvmArgs.add("-"+args[i].substring(2)); - } - } - return jvmArgs; - } - - private static void executeLegacy(String[] args) throws Exception { - Options options = getOptions(); - - CommandLineParser cliParser = new GnuParser(); - CommandLine line = cliParser.parse(options, args); - + private static void executeLegacy(CommandLine line, Options options) throws Exception { if (line.hasOption("help")) { - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.printHelp("tikaserver", options); - System.exit(-1); + usage(options); } String host = DEFAULT_HOST; @@ -307,30 +253,31 @@ public class TikaServerCli { inputStreamFactory = new DefaultInputStreamFactory(); } - int maxFiles = DEFAULT_MAX_FILES; - if (line.hasOption("maxFiles")) { - maxFiles = Integer.parseInt(line.getOptionValue("maxFiles")); - } + ServerStatus serverStatus = new ServerStatus(); + //if this is a child process + if (line.hasOption("child")) { + long maxFiles = DEFAULT_MAX_FILES; + if (line.hasOption("maxFiles")) { + maxFiles = Long.parseLong(line.getOptionValue("maxFiles")); + } - long timeoutMS = DEFAULT_TIME_OUT_MS; - if (line.hasOption("timeoutMS")) { - timeoutMS = Long.parseLong(line.getOptionValue("timeoutMS")); + ServerTimeouts serverTimeouts = configureServerTimeouts(line); + Thread serverThread = + new Thread(new ServerStatusWatcher(serverStatus, System.in, + System.out, maxFiles, serverTimeouts)); + serverThread.start(); + System.setIn(new ByteArrayInputStream(new byte[0])); + System.setOut(System.err); } - long pulseMS = DEFAULT_PULSE_MS; - if (line.hasOption("pulseMS")) { - pulseMS = Long.parseLong(line.getOptionValue("pulseMS")); - } - ServerStatus serverStatus = new ServerStatus(maxFiles); - new Thread(new ServerStatusWatcher(serverStatus, timeoutMS, pulseMS)).start(); - TikaResource.init(tika, digester, inputStreamFactory); + TikaResource.init(tika, digester, inputStreamFactory, serverStatus); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); List<ResourceProvider> rCoreProviders = new ArrayList<>(); rCoreProviders.add(new SingletonResourceProvider(new MetadataResource())); rCoreProviders.add(new SingletonResourceProvider(new RecursiveMetadataResource())); - rCoreProviders.add(new SingletonResourceProvider(new DetectorResource())); + rCoreProviders.add(new SingletonResourceProvider(new DetectorResource(serverStatus))); rCoreProviders.add(new SingletonResourceProvider(new LanguageResource())); - rCoreProviders.add(new SingletonResourceProvider(new TranslateResource())); + rCoreProviders.add(new SingletonResourceProvider(new TranslateResource(serverStatus))); rCoreProviders.add(new SingletonResourceProvider(new TikaResource())); rCoreProviders.add(new SingletonResourceProvider(new UnpackerResource())); rCoreProviders.add(new SingletonResourceProvider(new TikaMimeTypes())); @@ -368,4 +315,38 @@ public class TikaServerCli { sf.create(); LOG.info("Started Apache Tika server at {}", url); } + + private static void usage(Options options) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("tikaserver", options); + System.exit(-1); + } + + private static ServerTimeouts configureServerTimeouts(CommandLine line) { + ServerTimeouts serverTimeouts = new ServerTimeouts(); + /*TODO -- add these in + if (line.hasOption("childProcessStartupMillis")) { + serverTimeouts.setChildProcessStartupMillis( + Long.parseLong(line.getOptionValue("childProcessStartupMillis"))); + } + if (line.hasOption("childProcessShutdownMillis")) { + serverTimeouts.setChildProcessShutdownMillis( + Long.parseLong(line.getOptionValue("childProcesShutdownMillis"))); + }*/ + if (line.hasOption("taskTimeoutMillis")) { + serverTimeouts.setTaskTimeoutMillis( + Long.parseLong(line.getOptionValue("taskTimeoutMillis"))); + } + if (line.hasOption("pingTimeoutMillis")) { + serverTimeouts.setPingTimeoutMillis( + Long.parseLong(line.getOptionValue("pingTimeoutMillis"))); + } + if (line.hasOption("pingPulseMillis")) { + serverTimeouts.setPingPulseMillis( + Long.parseLong(line.getOptionValue("pingPulseMillis"))); + } + + return serverTimeouts; + } + } diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java new file mode 100644 index 0000000..67007f2 --- /dev/null +++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.server; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class TikaServerWatchDog { + + private static final Logger LOG = LoggerFactory.getLogger(TikaServerWatchDog.class); + private volatile Instant lastPing = null; + private ChildProcess childProcess = null; + int restarts = 0; + + public void execute(String[] args, ServerTimeouts serverTimeouts) throws Exception { + //if the child thread is in stop-the-world mode, and isn't + //responding to the ping, this thread checks to make sure + //that the parent ping is sent and received often enough + //If it isn't, this force destroys the child process. + Thread pingTimer = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + long tmpLastPing = -1L; + try { + //TODO: clean this up with synchronization/locking + //to avoid potential NPE + tmpLastPing = lastPing.toEpochMilli(); + } catch (NullPointerException e) { + + } + if (tmpLastPing > 0) { + long elapsed = Duration.between(Instant.ofEpochMilli(tmpLastPing), Instant.now()).toMillis(); + if (elapsed > serverTimeouts.getPingTimeoutMillis()) { + Process processToDestroy = null; + try { + processToDestroy = childProcess.process; + } catch (NullPointerException e) { + //ignore + } + destroyChildForcibly(processToDestroy); + } + } + try { + Thread.sleep(serverTimeouts.getPingPulseMillis()); + } catch (InterruptedException e) { + //swallow + } + } + } + } + ); + pingTimer.setDaemon(true); + pingTimer.start(); + try { + childProcess = new ChildProcess(args); + + while (true) { + + if (!childProcess.ping()) { + lastPing = null; + childProcess.close(); + LOG.info("About to restart the child process"); + childProcess = new ChildProcess(args); + LOG.info("Successfully restarted child process -- {} restarts so far)", ++restarts); + } + Thread.sleep(serverTimeouts.getPingPulseMillis()); + } + } catch (InterruptedException e) { + //interrupted...shutting down + } finally { + if (childProcess != null) { + childProcess.close(); + } + } + } + + private static List<String> extractArgs(String[] args) { + List<String> argList = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("-J") || args[i].equals("-spawnChild") || args[i].equals("--spawnChild")) { + continue; + } + argList.add(args[i]); + } + return argList; + } + + private static List<String> extractJVMArgs(String[] args) { + List<String> jvmArgs = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("-J")) { + jvmArgs.add("-"+args[i].substring(2)); + } + } + return jvmArgs; + } + + private class ChildProcess { + private Thread SHUTDOWN_HOOK = null; + + Process process; + DataInputStream fromChild; + DataOutputStream toChild; + + + + private ChildProcess(String[] args) throws Exception { + this.process = startProcess(args); + + this.fromChild = new DataInputStream(process.getInputStream()); + this.toChild = new DataOutputStream(process.getOutputStream()); + byte status = fromChild.readByte(); + if (status != ServerStatus.STATUS.OPERATING.getByte()) { + throw new IOException("bad status from child process: "+ + ServerStatus.STATUS.lookup(status)); + } + } + + public boolean ping() { + lastPing = Instant.now(); + try { + toChild.writeByte(ServerStatus.DIRECTIVES.PING.getByte()); + toChild.flush(); + } catch (Exception e) { + LOG.warn("Exception pinging child process", e); + return false; + } + try { + byte status = fromChild.readByte(); + if (status != ServerStatus.STATUS.OPERATING.getByte()) { + LOG.warn("Received status from child: {}", + ServerStatus.STATUS.lookup(status)); + return false; + } + } catch (Exception e) { + LOG.warn("Exception receiving status from child", e); + return false; + } + return true; + } + + private void close() { + try { + toChild.writeByte(ServerStatus.DIRECTIVES.SHUTDOWN.getByte()); + toChild.flush(); + } catch (Exception e) { + LOG.warn("Exception asking child to shutdown", e); + } + //TODO: add a gracefully timed shutdown routine + try { + fromChild.close(); + } catch (Exception e) { + LOG.warn("Problem shutting down reader from child", e); + } + + try { + toChild.close(); + } catch (Exception e) { + LOG.warn("Problem shutting down writer to child", e); + } + destroyChildForcibly(process); + } + + private Process startProcess(String[] args) throws IOException { + ProcessBuilder builder = new ProcessBuilder(); + builder.redirectError(ProcessBuilder.Redirect.INHERIT); + List<String> argList = new ArrayList<>(); + List<String> jvmArgs = extractJVMArgs(args); + List<String> childArgs = extractArgs(args); + argList.add("java"); + if (! jvmArgs.contains("-cp") && ! jvmArgs.contains("--classpath")) { + String cp = System.getProperty("java.class.path"); + jvmArgs.add("-cp"); + jvmArgs.add(cp); + } + argList.addAll(jvmArgs); + argList.add("org.apache.tika.server.TikaServerCli"); + argList.addAll(childArgs); + argList.add("-child"); + + builder.command(argList); + Process process = builder.start(); + if (SHUTDOWN_HOOK != null) { + Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK); + } + SHUTDOWN_HOOK = new Thread(() -> process.destroyForcibly()); + Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK); + + return process; + } + } + + private static synchronized void destroyChildForcibly(Process process) { + process = process.destroyForcibly(); + try { + boolean destroyed = process.waitFor(60, TimeUnit.SECONDS); + if (! destroyed) { + LOG.error("Child process still alive after 60 seconds. " + + "Shutting down the parent."); + System.exit(1); + } + + } catch (InterruptedException e) { + //swallow + } + } + +} diff --git a/tika-server/src/main/java/org/apache/tika/server/resource/DetectorResource.java b/tika-server/src/main/java/org/apache/tika/server/resource/DetectorResource.java index 8d32fa0..acbdfaa 100644 --- a/tika-server/src/main/java/org/apache/tika/server/resource/DetectorResource.java +++ b/tika-server/src/main/java/org/apache/tika/server/resource/DetectorResource.java @@ -29,14 +29,20 @@ import java.io.InputStream; import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.mime.MediaType; +import org.apache.tika.server.ServerStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Path("/detect") public class DetectorResource { private static final Logger LOG = LoggerFactory.getLogger(DetectorResource.class); + private final ServerStatus serverStatus; + public DetectorResource(ServerStatus serverStatus) { + this.serverStatus = serverStatus; + } @PUT @Path("stream") @Consumes("*/*") @@ -49,11 +55,18 @@ public class DetectorResource { .getRequestHeaders()); LOG.info("Detecting media type for Filename: {}", filename); met.add(Metadata.RESOURCE_NAME_KEY, filename); + TikaResource.checkIsOperating(); + long taskId = serverStatus.start(ServerStatus.TASK.DETECT, filename); try { return TikaResource.getConfig().getDetector().detect(tis, met).toString(); } catch (IOException e) { LOG.warn("Unable to detect MIME type for file. Reason: {}", e.getMessage(), e); return MediaType.OCTET_STREAM.toString(); + } catch (OutOfMemoryError e) { + serverStatus.setStatus(ServerStatus.STATUS.ERROR); + throw e; + } finally { + serverStatus.complete(taskId); } } } diff --git a/tika-server/src/main/java/org/apache/tika/server/resource/TikaResource.java b/tika-server/src/main/java/org/apache/tika/server/resource/TikaResource.java index 4d8679c..450c888 100644 --- a/tika-server/src/main/java/org/apache/tika/server/resource/TikaResource.java +++ b/tika-server/src/main/java/org/apache/tika/server/resource/TikaResource.java @@ -43,6 +43,7 @@ import org.apache.tika.sax.BodyContentHandler; import org.apache.tika.sax.ExpandedTitleContentHandler; import org.apache.tika.sax.RichTextContentHandler; import org.apache.tika.server.InputStreamFactory; +import org.apache.tika.server.ServerStatus; import org.apache.tika.server.TikaServerParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,12 +98,13 @@ public class TikaResource { private static TikaConfig tikaConfig; private static DigestingParser.Digester digester = null; private static InputStreamFactory inputStreamFactory = null; - + private static ServerStatus SERVER_STATUS = null; public static void init(TikaConfig config, DigestingParser.Digester digestr, - InputStreamFactory iSF) { + InputStreamFactory iSF, ServerStatus serverStatus) { tikaConfig = config; digester = digestr; inputStreamFactory = iSF; + SERVER_STATUS = serverStatus; } static { @@ -391,6 +393,11 @@ public class TikaResource { */ public static void parse(Parser parser, Logger logger, String path, InputStream inputStream, ContentHandler handler, Metadata metadata, ParseContext parseContext) throws IOException { + + checkIsOperating(); + + long taskId = SERVER_STATUS.start(ServerStatus.TASK.PARSE, + metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY)); try { parser.parse(inputStream, handler, metadata, parseContext); } catch (SAXException e) { @@ -401,11 +408,22 @@ public class TikaResource { } catch (Exception e) { logger.warn("{}: Text extraction failed", path, e); throw new TikaServerParseException(e); + } catch (OutOfMemoryError e) { + SERVER_STATUS.setStatus(ServerStatus.STATUS.ERROR); + throw e; } finally { + SERVER_STATUS.complete(taskId); inputStream.close(); } } + public static void checkIsOperating() { + //check that server is not in shutdown mode + if (! SERVER_STATUS.isOperating()) { + throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE); + } + } + public static void logRequest(Logger logger, UriInfo info, Metadata metadata) { if (metadata.get(org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE) == null) { logger.info("{} (autodetecting type)", info.getPath()); @@ -417,6 +435,7 @@ public class TikaResource { @GET @Produces("text/plain") public String getMessage() { + checkIsOperating(); return GREETING; } diff --git a/tika-server/src/main/java/org/apache/tika/server/resource/TikaVersion.java b/tika-server/src/main/java/org/apache/tika/server/resource/TikaVersion.java index b695940..a892716 100644 --- a/tika-server/src/main/java/org/apache/tika/server/resource/TikaVersion.java +++ b/tika-server/src/main/java/org/apache/tika/server/resource/TikaVersion.java @@ -33,6 +33,7 @@ public class TikaVersion { @GET @Produces("text/plain") public String getVersion() { + TikaResource.checkIsOperating(); return tika.toString(); } } diff --git a/tika-server/src/main/java/org/apache/tika/server/resource/TikaWelcome.java b/tika-server/src/main/java/org/apache/tika/server/resource/TikaWelcome.java index f44ff96..3408027 100644 --- a/tika-server/src/main/java/org/apache/tika/server/resource/TikaWelcome.java +++ b/tika-server/src/main/java/org/apache/tika/server/resource/TikaWelcome.java @@ -24,6 +24,8 @@ import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.ArrayList; @@ -135,6 +137,8 @@ public class TikaWelcome { @GET @Produces("text/html") public String getWelcomeHTML() { + TikaResource.checkIsOperating(); + StringBuffer h = new StringBuffer(); String tikaVersion = tika.toString(); @@ -190,6 +194,7 @@ public class TikaWelcome { @GET @Produces("text/plain") public String getWelcomePlain() { + TikaResource.checkIsOperating(); StringBuffer text = new StringBuffer(); text.append(tika.toString()); diff --git a/tika-server/src/main/java/org/apache/tika/server/resource/TranslateResource.java b/tika-server/src/main/java/org/apache/tika/server/resource/TranslateResource.java index 0aba6f9..0417077 100644 --- a/tika-server/src/main/java/org/apache/tika/server/resource/TranslateResource.java +++ b/tika-server/src/main/java/org/apache/tika/server/resource/TranslateResource.java @@ -29,6 +29,8 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; import org.apache.commons.io.IOUtils; import org.apache.tika.config.LoadErrorHandler; @@ -37,6 +39,7 @@ import org.apache.tika.exception.TikaException; import org.apache.tika.langdetect.OptimaizeLangDetector; import org.apache.tika.language.detect.LanguageResult; import org.apache.tika.language.translate.Translator; +import org.apache.tika.server.ServerStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,10 +51,12 @@ public class TranslateResource { private static final Logger LOG = LoggerFactory.getLogger(TranslateResource.class); - public TranslateResource() { + private final ServerStatus serverStatus; + public TranslateResource(ServerStatus serverStatus) { this.loader = new ServiceLoader(ServiceLoader.class.getClassLoader(), LoadErrorHandler.WARN); this.defaultTranslator = TikaResource.getConfig().getTranslator(); + this.serverStatus = serverStatus; } @PUT @@ -94,8 +99,16 @@ public class TranslateResource { translate = this.defaultTranslator; LOG.info("Using default translator"); } - - return translate.translate(content, sLang, dLang); + TikaResource.checkIsOperating(); + long taskId = serverStatus.start(ServerStatus.TASK.TRANSLATE, null); + try { + return translate.translate(content, sLang, dLang); + } catch (OutOfMemoryError e) { + serverStatus.setStatus(ServerStatus.STATUS.ERROR); + throw e; + } finally { + serverStatus.complete(taskId); + } } private Translator byClassName(String className) { diff --git a/tika-server/src/test/java/org/apache/tika/server/CXFTestBase.java b/tika-server/src/test/java/org/apache/tika/server/CXFTestBase.java index f851e97..57220d4 100644 --- a/tika-server/src/test/java/org/apache/tika/server/CXFTestBase.java +++ b/tika-server/src/test/java/org/apache/tika/server/CXFTestBase.java @@ -84,7 +84,7 @@ public abstract class CXFTestBase { this.tika = new TikaConfig(getClass().getResourceAsStream("tika-config-for-server-tests.xml")); TikaResource.init(tika, new CommonsDigester(DIGESTER_READ_LIMIT, "md5,sha1:32"), - new DefaultInputStreamFactory()); + new DefaultInputStreamFactory(), new ServerStatus()); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); setUpResources(sf); setUpProviders(sf); diff --git a/tika-server/src/test/java/org/apache/tika/server/DetectorResourceTest.java b/tika-server/src/test/java/org/apache/tika/server/DetectorResourceTest.java index 3d4dc1f..5b1e7b7 100644 --- a/tika-server/src/test/java/org/apache/tika/server/DetectorResourceTest.java +++ b/tika-server/src/test/java/org/apache/tika/server/DetectorResourceTest.java @@ -45,7 +45,7 @@ public class DetectorResourceTest extends CXFTestBase { protected void setUpResources(JAXRSServerFactoryBean sf) { sf.setResourceClasses(DetectorResource.class); sf.setResourceProvider(DetectorResource.class, - new SingletonResourceProvider(new DetectorResource())); + new SingletonResourceProvider(new DetectorResource(new ServerStatus()))); } diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java deleted file mode 100644 index 8568c6c..0000000 --- a/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.server; - -import org.apache.cxf.jaxrs.client.WebClient; -import org.apache.tika.TikaTest; -import org.apache.tika.metadata.Metadata; -import org.apache.tika.metadata.OfficeOpenXMLExtended; -import org.apache.tika.metadata.serialization.JsonMetadataList; -import org.junit.Test; - -import javax.ws.rs.core.Response; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.util.List; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.Assert.assertEquals; - -public class ServerIntegrationTest extends TikaTest { - private static final String TEST_RECURSIVE_DOC = "test_recursive_embedded.docx"; - private static final String META_PATH = "/rmeta"; - protected static final String endPoint = - "http://localhost:" + TikaServerCli.DEFAULT_PORT; - - @Test - public void testBasic() throws Exception { - - Thread serverThread = new Thread() { - @Override - public void run() { - TikaServerCli.main( - new String[]{ - "-spawnChild", "-p", Integer.toString(TikaServerCli.DEFAULT_PORT) - }); - } - }; - serverThread.start(); - //test for the server being available...rather than this sleep call - Thread.sleep(20000); - Response response = WebClient - .create(endPoint + META_PATH) - .accept("application/json") - .put(ClassLoader - .getSystemResourceAsStream(TEST_RECURSIVE_DOC)); - Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); - List<Metadata> metadataList = JsonMetadataList.fromJson(reader); - assertEquals(12, metadataList.size()); - assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION)); - assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content")); - - //assertEquals("a38e6c7b38541af87148dee9634cb811", metadataList.get(10).get("X-TIKA:digest:MD5")); - - serverThread.interrupt(); - - - } -} diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java index 23880ff..39d1583 100644 --- a/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java +++ b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java @@ -33,18 +33,18 @@ public class ServerStatusTest { @Test(expected = IllegalArgumentException.class) public void testBadId() throws Exception { - ServerStatus status = new ServerStatus(-1); + ServerStatus status = new ServerStatus(); status.complete(2); } @Test(timeout = 60000) public void testBasicMultiThreading() throws Exception { //make sure that synchronization is basically working - int numThreads = 100; - int filesToProcess = 100; - ExecutorService service = Executors.newFixedThreadPool(100); + int numThreads = 10; + int filesToProcess = 20; + ExecutorService service = Executors.newFixedThreadPool(numThreads); ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(service); - ServerStatus serverStatus = new ServerStatus(-1); + ServerStatus serverStatus = new ServerStatus(); for (int i = 0; i < numThreads; i++) { completionService.submit(new MockTask(serverStatus, filesToProcess)); } @@ -78,15 +78,15 @@ public class ServerStatusTest { int processed = 0; for (int i = 0; i < filesToProcess; i++) { sleepRandom(200); - int taskId = serverStatus.start(ServerStatus.TASK.PARSE, null); + long taskId = serverStatus.start(ServerStatus.TASK.PARSE, null); sleepRandom(100); serverStatus.complete(taskId); processed++; serverStatus.getStatus(); sleepRandom(10); - serverStatus.setStatus(ServerStatus.STATUS.OPEN); + serverStatus.setStatus(ServerStatus.STATUS.OPERATING); sleepRandom(20); - Map<Integer, TaskStatus> tasks = serverStatus.getTasks(); + Map<Long, TaskStatus> tasks = serverStatus.getTasks(); assertNotNull(tasks); } return processed; diff --git a/tika-server/src/test/java/org/apache/tika/server/StackTraceOffTest.java b/tika-server/src/test/java/org/apache/tika/server/StackTraceOffTest.java index 6c86437..d385581 100644 --- a/tika-server/src/test/java/org/apache/tika/server/StackTraceOffTest.java +++ b/tika-server/src/test/java/org/apache/tika/server/StackTraceOffTest.java @@ -65,7 +65,7 @@ public class StackTraceOffTest extends CXFTestBase { List<ResourceProvider> rCoreProviders = new ArrayList<ResourceProvider>(); rCoreProviders.add(new SingletonResourceProvider(new MetadataResource())); rCoreProviders.add(new SingletonResourceProvider(new RecursiveMetadataResource())); - rCoreProviders.add(new SingletonResourceProvider(new DetectorResource())); + rCoreProviders.add(new SingletonResourceProvider(new DetectorResource(new ServerStatus()))); rCoreProviders.add(new SingletonResourceProvider(new TikaResource())); rCoreProviders.add(new SingletonResourceProvider(new UnpackerResource())); sf.setResourceProviders(rCoreProviders); diff --git a/tika-server/src/test/java/org/apache/tika/server/StackTraceTest.java b/tika-server/src/test/java/org/apache/tika/server/StackTraceTest.java index 2b76f33..24882f7 100644 --- a/tika-server/src/test/java/org/apache/tika/server/StackTraceTest.java +++ b/tika-server/src/test/java/org/apache/tika/server/StackTraceTest.java @@ -59,7 +59,7 @@ public class StackTraceTest extends CXFTestBase { List<ResourceProvider> rCoreProviders = new ArrayList<ResourceProvider>(); rCoreProviders.add(new SingletonResourceProvider(new MetadataResource())); rCoreProviders.add(new SingletonResourceProvider(new RecursiveMetadataResource())); - rCoreProviders.add(new SingletonResourceProvider(new DetectorResource())); + rCoreProviders.add(new SingletonResourceProvider(new DetectorResource(new ServerStatus()))); rCoreProviders.add(new SingletonResourceProvider(new TikaResource())); rCoreProviders.add(new SingletonResourceProvider(new UnpackerResource())); sf.setResourceProviders(rCoreProviders); diff --git a/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java b/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java new file mode 100644 index 0000000..d328711 --- /dev/null +++ b/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.server; + +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.tika.TikaTest; +import org.apache.tika.io.IOUtils; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.OfficeOpenXMLExtended; +import org.apache.tika.metadata.serialization.JsonMetadataList; +import org.junit.Test; + +import javax.ws.rs.core.Response; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.time.Duration; +import java.time.Instant; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; + +public class TikaServerIntegrationTest extends TikaTest { + + private static final String TEST_RECURSIVE_DOC = "test_recursive_embedded.docx"; + private static final String TEST_OOM = "mock/real_oom.xml"; + private static final String TEST_SYSTEM_EXIT = "mock/system_exit.xml"; + private static final String TEST_HEAVY_HANG = "mock/heavy_hang_30000.xml"; + private static final String TEST_HEAVY_HANG_SHORT = "mock/heavy_hang_100.xml"; + private static final String META_PATH = "/rmeta"; + + //running into conflicts on 9998 with the CXFTestBase tests + //TODO: figure out why?! + private static final String INTEGRATION_TEST_PORT = "9999"; + + protected static final String endPoint = + "http://localhost:" + INTEGRATION_TEST_PORT; + + @Test + public void testBasic() throws Exception { + + Thread serverThread = new Thread() { + @Override + public void run() { + TikaServerCli.main( + new String[]{ + "-spawnChild", + "-p", INTEGRATION_TEST_PORT + }); + } + }; + serverThread.start(); + awaitServerStartup(); + + Response response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_RECURSIVE_DOC)); + Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); + List<Metadata> metadataList = JsonMetadataList.fromJson(reader); + assertEquals(12, metadataList.size()); + assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION)); + assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content")); + + //assertEquals("a38e6c7b38541af87148dee9634cb811", metadataList.get(10).get("X-TIKA:digest:MD5")); + + serverThread.interrupt(); + + + } + + @Test + public void testOOM() throws Exception { + + Thread serverThread = new Thread() { + @Override + public void run() { + TikaServerCli.main( + new String[]{ + "-spawnChild", "-JXmx512m", + "-p", INTEGRATION_TEST_PORT + }); + } + }; + serverThread.start(); + awaitServerStartup(); + Response response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_OOM)); + //give some time for the server to crash/kill itself + Thread.sleep(2000); + awaitServerStartup(); + + response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_RECURSIVE_DOC)); + Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); + List<Metadata> metadataList = JsonMetadataList.fromJson(reader); + assertEquals(12, metadataList.size()); + assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION)); + assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content")); + + serverThread.interrupt(); + } + + @Test + public void testSystemExit() throws Exception { + + Thread serverThread = new Thread() { + @Override + public void run() { + TikaServerCli.main( + new String[]{ + "-spawnChild", + "-p", INTEGRATION_TEST_PORT + }); + } + }; + serverThread.start(); + awaitServerStartup(); + Response response = null; + try { + response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_SYSTEM_EXIT)); + } catch (Exception e) { + //sys exit causes catchable problems for the client + } + //give some time for the server to crash/kill itself + Thread.sleep(2000); + + awaitServerStartup(); + + response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_RECURSIVE_DOC)); + + Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); + List<Metadata> metadataList = JsonMetadataList.fromJson(reader); + assertEquals(12, metadataList.size()); + assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION)); + assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content")); + + serverThread.interrupt(); + + + } + + @Test + public void testTimeoutOk() throws Exception { + //test that there's enough time for this file. + Thread serverThread = new Thread() { + @Override + public void run() { + TikaServerCli.main( + new String[]{ + "-spawnChild", "-p", INTEGRATION_TEST_PORT, + "-taskTimeoutMillis", "10000", "-taskPulseMillis", "500", + "-pingPulseMillis", "500" + }); + } + }; + serverThread.start(); + awaitServerStartup(); + Response response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_HEAVY_HANG_SHORT)); + awaitServerStartup(); + + response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_RECURSIVE_DOC)); + Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); + List<Metadata> metadataList = JsonMetadataList.fromJson(reader); + assertEquals(12, metadataList.size()); + assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION)); + assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content")); + + serverThread.interrupt(); + + + } + + @Test + public void testTimeout() throws Exception { + + Thread serverThread = new Thread() { + @Override + public void run() { + TikaServerCli.main( + new String[]{ + "-spawnChild", "-p", INTEGRATION_TEST_PORT, + "-taskTimeoutMillis", "10000", "-taskPulseMillis", "500", + "-pingPulseMillis", "500" + }); + } + }; + serverThread.start(); + awaitServerStartup(); + Response response = null; + try { + response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_HEAVY_HANG)); + } catch (Exception e) { + //catchable exception when server shuts down. + } + awaitServerStartup(); + + response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_RECURSIVE_DOC)); + Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); + List<Metadata> metadataList = JsonMetadataList.fromJson(reader); + assertEquals(12, metadataList.size()); + assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION)); + assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content")); + + serverThread.interrupt(); + + + } + + private void awaitServerStartup() throws Exception { + + Instant started = Instant.now(); + long elapsed = Duration.between(started, Instant.now()).toMillis(); + while (elapsed < 30000) { + try { + Response response = WebClient + .create(endPoint + "/tika") + .accept("text/plain") + .get(); + if (response.getStatus() == 200) { + return; + } + } catch (javax.ws.rs.ProcessingException e) { + } + Thread.sleep(1000); + elapsed = Duration.between(started, Instant.now()).toMillis(); + } + + } +} diff --git a/tika-server/src/test/java/org/apache/tika/server/TikaWelcomeTest.java b/tika-server/src/test/java/org/apache/tika/server/TikaWelcomeTest.java index 27376ff..a52a79a 100644 --- a/tika-server/src/test/java/org/apache/tika/server/TikaWelcomeTest.java +++ b/tika-server/src/test/java/org/apache/tika/server/TikaWelcomeTest.java @@ -45,7 +45,7 @@ public class TikaWelcomeTest extends CXFTestBase { List<ResourceProvider> rpsCore = new ArrayList<ResourceProvider>(); rpsCore.add(new SingletonResourceProvider(new TikaVersion())); - rpsCore.add(new SingletonResourceProvider(new DetectorResource())); + rpsCore.add(new SingletonResourceProvider(new DetectorResource(new ServerStatus()))); rpsCore.add(new SingletonResourceProvider(new MetadataResource())); List<ResourceProvider> all = new ArrayList<ResourceProvider>(rpsCore); all.add(new SingletonResourceProvider(new TikaWelcome(rpsCore))); diff --git a/tika-server/src/test/java/org/apache/tika/server/TranslateResourceTest.java b/tika-server/src/test/java/org/apache/tika/server/TranslateResourceTest.java index 3cc7be4..c52db65 100644 --- a/tika-server/src/test/java/org/apache/tika/server/TranslateResourceTest.java +++ b/tika-server/src/test/java/org/apache/tika/server/TranslateResourceTest.java @@ -47,7 +47,7 @@ public class TranslateResourceTest extends CXFTestBase { protected void setUpResources(JAXRSServerFactoryBean sf) { sf.setResourceClasses(TranslateResource.class); sf.setResourceProvider(TranslateResource.class, - new SingletonResourceProvider(new TranslateResource())); + new SingletonResourceProvider(new TranslateResource(new ServerStatus()))); } diff --git a/tika-server/src/test/resources/mock/heavy_hand_100.xml b/tika-server/src/test/resources/mock/heavy_hand_100.xml new file mode 100644 index 0000000..f1f5b67 --- /dev/null +++ b/tika-server/src/test/resources/mock/heavy_hand_100.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + <metadata action="add" name="author">Nikolai Lobachevsky</metadata> + <write element="p">some content</write> + <hang millis="30000" heavy="true" pulse_millis="100" /> +</mock> \ No newline at end of file diff --git a/tika-server/src/test/resources/mock/heavy_hang_30000.xml b/tika-server/src/test/resources/mock/heavy_hang_30000.xml new file mode 100644 index 0000000..f1f5b67 --- /dev/null +++ b/tika-server/src/test/resources/mock/heavy_hang_30000.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + <metadata action="add" name="author">Nikolai Lobachevsky</metadata> + <write element="p">some content</write> + <hang millis="30000" heavy="true" pulse_millis="100" /> +</mock> \ No newline at end of file diff --git a/tika-server/src/test/resources/mock/real_oom.xml b/tika-server/src/test/resources/mock/real_oom.xml new file mode 100644 index 0000000..168751a --- /dev/null +++ b/tika-server/src/test/resources/mock/real_oom.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + <metadata action="add" name="author">Nikolai Lobachevsky</metadata> + <oom/> +</mock> \ No newline at end of file diff --git a/tika-server/src/test/resources/mock/system_exit.xml b/tika-server/src/test/resources/mock/system_exit.xml new file mode 100644 index 0000000..75d1d3b --- /dev/null +++ b/tika-server/src/test/resources/mock/system_exit.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + <metadata action="add" name="author">Nikolai Lobachevsky</metadata> + <write element="p">some content</write> + <system_exit /> +</mock> \ No newline at end of file diff --git a/tika-server/src/test/resources/mock/thread_interrupt.xml b/tika-server/src/test/resources/mock/thread_interrupt.xml new file mode 100644 index 0000000..3e54512 --- /dev/null +++ b/tika-server/src/test/resources/mock/thread_interrupt.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<mock> + <metadata action="add" name="author">Nikolai Lobachevsky</metadata> + <write element="p">some content</write> + <thread_interrupt /> +</mock> \ No newline at end of file
