This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch branch_1x in repository https://gitbox.apache.org/repos/asf/tika.git
commit 58dadac9131184e74c468771244595f7daee36c8 Author: TALLISON <[email protected]> AuthorDate: Fri Sep 7 19:16:32 2018 -0400 TIKA-2725 -- checkpoint commit ... basic child process is started...need to integrate actual statuswatcher, etc. # Conflicts: # tika-parsers/src/test/resources/test-documents/testPST.pst # tika-parsers/src/test/resources/test-documents/testPST_variousBodyTypes.pst --- .../tika/server/FileCountExceededException.java | 9 ++ .../java/org/apache/tika/server/ServerStatus.java | 98 +++++++++++++++ .../apache/tika/server/ServerStatusWatcher.java | 76 ++++++++++++ .../java/org/apache/tika/server/TaskStatus.java | 41 +++++++ .../java/org/apache/tika/server/TikaServerCli.java | 132 ++++++++++++++++++++- .../apache/tika/server/ServerIntegrationTest.java | 73 ++++++++++++ .../org/apache/tika/server/ServerStatusTest.java | 100 ++++++++++++++++ 7 files changed, 524 insertions(+), 5 deletions(-) diff --git a/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java b/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java new file mode 100644 index 0000000..9920556 --- /dev/null +++ b/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java @@ -0,0 +1,9 @@ +package org.apache.tika.server; + +/** + * Exception thrown by ServerStatusWatcher if tika-server exceeds + * the maximum number of files to process. + */ +public class FileCountExceededException extends Exception { + +} diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java new file mode 100644 index 0000000..861007d --- /dev/null +++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.server; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class ServerStatus { + + enum STATUS { + OPEN(0), + HIT_MAX(1), + TIMEOUT(2), + ERROR(3), + PARENT_REQUESTED_SHUTDOWN(4); + + private final int shutdownCode; + STATUS(int shutdownCode) { + this.shutdownCode = shutdownCode; + } + int getShutdownCode() { + return shutdownCode; + } + } + enum TASK { + PARSE, + UNZIP, + DETECT, + METADATA + }; + + private final int maxFilesToProcess; + private AtomicInteger counter = new AtomicInteger(0); + private Map<Integer, TaskStatus> tasks = new HashMap<>(); + + private STATUS status = STATUS.OPEN; + public ServerStatus(int maxFilesToProcess) { + this.maxFilesToProcess = maxFilesToProcess; + } + public synchronized int start(TASK task, String fileName) throws FileCountExceededException { + int i = counter.incrementAndGet(); + if (i == Integer.MAX_VALUE || + (maxFilesToProcess > 0 && i >= maxFilesToProcess)) { + setStatus(STATUS.HIT_MAX); + throw new FileCountExceededException(); + } + tasks.put(i, new TaskStatus(task, Instant.now(), fileName)); + return i; + } + + /** + * Removes the task from the collection of currently running tasks. + * + * @param taskId + * @throws IllegalArgumentException if there is no task by that taskId in the collection + */ + public synchronized void complete(int taskId) throws IllegalArgumentException { + TaskStatus status = tasks.remove(taskId); + if (status == null) { + throw new IllegalArgumentException("TaskId is not in map:"+taskId); + } + } + + public synchronized void setStatus(STATUS status) { + this.status = status; + } + + public synchronized STATUS getStatus() { + return status; + } + + public synchronized Map<Integer, TaskStatus> getTasks() { + Map<Integer, TaskStatus> ret = new HashMap<>(); + ret.putAll(tasks); + return ret; + } + + public synchronized int getFilesProcessed() { + return counter.get(); + } +} diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java new file mode 100644 index 0000000..24b1ddb --- /dev/null +++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.server; + +import org.apache.tika.server.resource.TranslateResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Callable; + +public class ServerStatusWatcher implements Runnable { + + + private static final Logger LOG = LoggerFactory.getLogger(ServerStatusWatcher.class); + + private final ServerStatus serverStatus; + private final long timeoutMillis; + private final long pulseMillis; + + public ServerStatusWatcher(ServerStatus serverStatus, long timeoutMillis, long pulseMillis) { + this.serverStatus = serverStatus; + this.timeoutMillis = timeoutMillis; + this.pulseMillis = pulseMillis; + } + + @Override + public void run() { + ServerStatus.STATUS status = serverStatus.getStatus(); + while (status.equals(ServerStatus.STATUS.OPEN)) { + try { + Thread.sleep(pulseMillis); + } catch (InterruptedException e) { + } + checkForTimeouts(); + status = serverStatus.getStatus(); + } + if (! status.equals(ServerStatus.STATUS.OPEN)) { + LOG.warn("child process shutting down with status: {}", status); + System.exit(status.getShutdownCode()); + } + } + + private void checkForTimeouts() { + Instant now = Instant.now(); + for (TaskStatus status : serverStatus.getTasks().values()) { + long millisElapsed = Duration.between(now, status.started).toMillis(); + if (millisElapsed > timeoutMillis) { + serverStatus.setStatus(ServerStatus.STATUS.TIMEOUT); + if (status.fileName.isPresent()) { + LOG.error("Timeout task {}, millis elapsed {}, file {}", + status.task.toString(), Long.toString(millisElapsed), status.fileName.get()); + } else { + LOG.error("Timeout task {}, millis elapsed {}", + status.task.toString(), Long.toString(millisElapsed)); + } + } + } + } +} \ No newline at end of file diff --git a/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java b/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java new file mode 100644 index 0000000..1637d7d --- /dev/null +++ b/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.server; + +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class TaskStatus { + final ServerStatus.TASK task; + final Instant started; + final Optional<String> fileName; + + TaskStatus(ServerStatus.TASK task, Instant started, String fileName) { + this.task = task; + this.started = started; + this.fileName = Optional.ofNullable(fileName); + } + + + @Override + public String toString() { + return ""; + } + +} diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java index 03d582e..af8fd8f 100644 --- a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java +++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java @@ -17,6 +17,7 @@ package org.apache.tika.server; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -62,6 +63,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TikaServerCli { + + + //used in spawn-child mode + private static final long PULSE_MILLIS = 100; + private static final int DEFAULT_MAX_FILES = -1; + private static final long DEFAULT_TIME_OUT_MS = 60000; + private static final long DEFAULT_PULSE_MS = 500; + private static Thread SHUTDOWN_HOOK = null; + + public static final int DEFAULT_PORT = 9998; private static final int DEFAULT_DIGEST_MARK_LIMIT = 20*1024*1024; public static final String DEFAULT_HOST = "localhost"; @@ -88,14 +99,114 @@ public class TikaServerCli { options.addOption("?", "help", false, "this help message"); options.addOption("enableUnsecureFeatures", false, "this is required to enable fileUrl."); options.addOption("enableFileUrl", false, "allows user to pass in fileUrl instead of InputStream."); - + options.addOption("spawnChild", false, "whether or not to spawn a child process for robustness"); + options.addOption("maxFiles", false, "shutdown server after this many files -- use only in 'spawnChild' mode"); return options; } public static void main(String[] args) { LOG.info("Starting {} server", new Tika()); + try { + execute(args); + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Can't start", e); + System.exit(-1); + } + } + + private static void execute(String[] args) throws Exception { + boolean spawnChild = false; + for (int i = 0; i < args.length; i++) { + if ("-spawnChild".equals(args[i]) || "--spawnChild".equals(args[i])) { + spawnChild = true; + break; + } + } + if (spawnChild) { + spawnChild(args); + } else { + executeLegacy(args); + } + } + private static void spawnChild(String[] args) throws Exception { + Process child = start(args); try { + while (true) { + Thread.sleep(PULSE_MILLIS); + + int exitValue = Integer.MAX_VALUE; + try { + exitValue = child.exitValue(); + } catch (IllegalThreadStateException e) { + //process is still running + } + if (exitValue != Integer.MAX_VALUE) { + if (exitValue != ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN.getShutdownCode()) { + LOG.warn("child exited with code ({}) -- restarting, now", Integer.toString(exitValue)); + child.destroyForcibly(); + child = start(args); + } + } + } + } catch (InterruptedException e) { + //interrupted...shutting down + } finally { + child.destroyForcibly(); + } + } + + private static Process start(String[] args) throws IOException { + ProcessBuilder builder = new ProcessBuilder(); + builder.inheritIO(); + List<String> argList = new ArrayList<>(); + List<String> jvmArgs = extractJVMArgs(args); + List<String> childArgs = extractArgs(args); + argList.add("java"); + if (! jvmArgs.contains("-cp") && ! jvmArgs.contains("--classpath")) { + String cp = System.getProperty("java.class.path"); + jvmArgs.add("-cp"); + jvmArgs.add(cp); + } + argList.addAll(jvmArgs); + argList.add("org.apache.tika.server.TikaServerCli"); + argList.addAll(childArgs); + + builder.command(argList); + + Process process = builder.start(); + + if (SHUTDOWN_HOOK != null) { + Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK); + } + SHUTDOWN_HOOK = new Thread(() -> process.destroy()); + Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK); + return process; + } + + private static List<String> extractArgs(String[] args) { + List<String> argList = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("-J") || args[i].equals("-spawnChild") || args[i].equals("--spawnChild")) { + continue; + } + argList.add(args[i]); + } + return argList; + } + + private static List<String> extractJVMArgs(String[] args) { + List<String> jvmArgs = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("-J")) { + jvmArgs.add("-"+args[i].substring(2)); + } + } + return jvmArgs; + } + + private static void executeLegacy(String[] args) throws Exception { Options options = getOptions(); CommandLineParser cliParser = new GnuParser(); @@ -196,6 +307,21 @@ public class TikaServerCli { inputStreamFactory = new DefaultInputStreamFactory(); } + int maxFiles = DEFAULT_MAX_FILES; + if (line.hasOption("maxFiles")) { + maxFiles = Integer.parseInt(line.getOptionValue("maxFiles")); + } + + long timeoutMS = DEFAULT_TIME_OUT_MS; + if (line.hasOption("timeoutMS")) { + timeoutMS = Long.parseLong(line.getOptionValue("timeoutMS")); + } + long pulseMS = DEFAULT_PULSE_MS; + if (line.hasOption("pulseMS")) { + pulseMS = Long.parseLong(line.getOptionValue("pulseMS")); + } + ServerStatus serverStatus = new ServerStatus(maxFiles); + new Thread(new ServerStatusWatcher(serverStatus, timeoutMS, pulseMS)).start(); TikaResource.init(tika, digester, inputStreamFactory); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); @@ -241,9 +367,5 @@ public class TikaServerCli { manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory); sf.create(); LOG.info("Started Apache Tika server at {}", url); - } catch (Exception ex) { - LOG.error("Can't start", ex); - System.exit(-1); - } } } diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java new file mode 100644 index 0000000..8568c6c --- /dev/null +++ b/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.server; + +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.tika.TikaTest; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.OfficeOpenXMLExtended; +import org.apache.tika.metadata.serialization.JsonMetadataList; +import org.junit.Test; + +import javax.ws.rs.core.Response; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; + +public class ServerIntegrationTest extends TikaTest { + private static final String TEST_RECURSIVE_DOC = "test_recursive_embedded.docx"; + private static final String META_PATH = "/rmeta"; + protected static final String endPoint = + "http://localhost:" + TikaServerCli.DEFAULT_PORT; + + @Test + public void testBasic() throws Exception { + + Thread serverThread = new Thread() { + @Override + public void run() { + TikaServerCli.main( + new String[]{ + "-spawnChild", "-p", Integer.toString(TikaServerCli.DEFAULT_PORT) + }); + } + }; + serverThread.start(); + //test for the server being available...rather than this sleep call + Thread.sleep(20000); + Response response = WebClient + .create(endPoint + META_PATH) + .accept("application/json") + .put(ClassLoader + .getSystemResourceAsStream(TEST_RECURSIVE_DOC)); + Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); + List<Metadata> metadataList = JsonMetadataList.fromJson(reader); + assertEquals(12, metadataList.size()); + assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION)); + assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content")); + + //assertEquals("a38e6c7b38541af87148dee9634cb811", metadataList.get(10).get("X-TIKA:digest:MD5")); + + serverThread.interrupt(); + + + } +} diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java new file mode 100644 index 0000000..23880ff --- /dev/null +++ b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.server; + +import org.junit.Test; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ServerStatusTest { + + @Test(expected = IllegalArgumentException.class) + public void testBadId() throws Exception { + ServerStatus status = new ServerStatus(-1); + status.complete(2); + } + + @Test(timeout = 60000) + public void testBasicMultiThreading() throws Exception { + //make sure that synchronization is basically working + int numThreads = 100; + int filesToProcess = 100; + ExecutorService service = Executors.newFixedThreadPool(100); + ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(service); + ServerStatus serverStatus = new ServerStatus(-1); + for (int i = 0; i < numThreads; i++) { + completionService.submit(new MockTask(serverStatus, filesToProcess)); + } + int finished = 0; + int totalProcessed = 0; + while (finished < numThreads) { + Future<Integer> future = completionService.take(); + if (future != null) { + finished++; + Integer completed = future.get(); + totalProcessed += completed; + } + } + assertEquals(numThreads*filesToProcess, totalProcessed); + assertEquals(0, serverStatus.getTasks().size()); + assertEquals(totalProcessed, serverStatus.getFilesProcessed()); + + } + + private class MockTask implements Callable<Integer> { + Random r = new Random(); + private final ServerStatus serverStatus; + private final int filesToProcess; + public MockTask(ServerStatus serverStatus, int filesToProcess) { + this.serverStatus = serverStatus; + this.filesToProcess = filesToProcess; + } + + @Override + public Integer call() throws Exception { + int processed = 0; + for (int i = 0; i < filesToProcess; i++) { + sleepRandom(200); + int taskId = serverStatus.start(ServerStatus.TASK.PARSE, null); + sleepRandom(100); + serverStatus.complete(taskId); + processed++; + serverStatus.getStatus(); + sleepRandom(10); + serverStatus.setStatus(ServerStatus.STATUS.OPEN); + sleepRandom(20); + Map<Integer, TaskStatus> tasks = serverStatus.getTasks(); + assertNotNull(tasks); + } + return processed; + } + + private void sleepRandom(int millis) throws InterruptedException { + int sleep = r.nextInt(millis); + Thread.sleep(sleep); + } + } +}
