Author: tallison
Date: Mon Mar 23 16:09:10 2015
New Revision: 1668673
URL: http://svn.apache.org/r1668673
Log:
initial commit of TIKA-1330
Added:
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
tika/trunk/tika-app/src/main/resources/log4j.properties
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
tika/trunk/tika-batch/
tika/trunk/tika-batch/pom.xml
tika/trunk/tika-batch/src/
tika/trunk/tika-batch/src/main/
tika/trunk/tika-batch/src/main/examples/
tika/trunk/tika-batch/src/main/examples/batchExecutor.sh
tika/trunk/tika-batch/src/main/examples/log4j.xml
tika/trunk/tika-batch/src/main/examples/log4j_driver.xml
tika/trunk/tika-batch/src/main/java/
tika/trunk/tika-batch/src/main/java/org/
tika/trunk/tika-batch/src/main/java/org/apache/
tika/trunk/tika-batch/src/main/java/org/apache/tika/
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/CommandLineParserBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/DefaultContentHandlerFactoryBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/IContentHandlerFactoryBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ICrawlerBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMAndQueueBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ReporterBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/SimpleLogReporterBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/StatusReporterBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/AbstractFSConsumer.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/BasicTikaFSConsumer.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSBatchProcessCLI.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSConsumersManager.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDirectoryCrawler.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDocumentSelector.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSFileResource.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSListCrawler.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSOutputStreamFactory.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSProperties.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSUtil.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java
tika/trunk/tika-batch/src/main/java/overview.html
tika/trunk/tika-batch/src/main/resources/
tika/trunk/tika-batch/src/main/resources/org/
tika/trunk/tika-batch/src/main/resources/org/apache/
tika/trunk/tika-batch/src/main/resources/org/apache/tika/
tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/
tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/
tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
tika/trunk/tika-batch/src/test/
tika/trunk/tika-batch/src/test/java/
tika/trunk/tika-batch/src/test/java/org/
tika/trunk/tika-batch/src/test/java/org/apache/
tika/trunk/tika-batch/src/test/java/org/apache/tika/
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/FSBatchTestBase.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/HandlerBuilderTest.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/OutputStreamFactoryTest.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/StringStreamGobbler.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/strawman/
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/strawman/StrawmanTest.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/mock/
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/mock/MockConsumersBuilder.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/mock/MockConsumersManager.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/parser/
tika/trunk/tika-batch/src/test/java/org/apache/tika/parser/mock/
tika/trunk/tika-batch/src/test/java/org/apache/tika/parser/mock/MockParserFactory.java
tika/trunk/tika-batch/src/test/java/org/apache/tika/util/
tika/trunk/tika-batch/src/test/java/org/apache/tika/util/TikaExceptionFilterTest.java
tika/trunk/tika-batch/src/test/resources/
tika/trunk/tika-batch/src/test/resources/log4j.properties
tika/trunk/tika-batch/src/test/resources/log4j_process.properties
tika/trunk/tika-batch/src/test/resources/test-documents/
tika/trunk/tika-batch/src/test/resources/test-documents/null_pointer.xml
tika/trunk/tika-batch/src/test/resources/test-input/
tika/trunk/tika-batch/src/test/resources/test-input/basic/
tika/trunk/tika-batch/src/test/resources/test-input/basic/test0.xml
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test0_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test1_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test2_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test3_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test4_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test5_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test6_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test0_oom.xml
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test1_oom.xml
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test2_oom.xml
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test3_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/no_restart/
tika/trunk/tika-batch/src/test/resources/test-input/no_restart/test1_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/no_restart/test2_norestart.xml
tika/trunk/tika-batch/src/test/resources/test-input/no_restart/test3_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/noisy_parsers/
tika/trunk/tika-batch/src/test/resources/test-input/noisy_parsers/test0.xml
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test0_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test1_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test2_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test3_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test4_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/oom/
tika/trunk/tika-batch/src/test/resources/test-input/oom/test0_sleep.xml
tika/trunk/tika-batch/src/test/resources/test-input/oom/test1_heavy_hang.xml
tika/trunk/tika-batch/src/test/resources/test-input/oom/test2_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/oom/test3_oom.xml
tika/trunk/tika-batch/src/test/resources/test-input/oom/test4_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/oom/test5_ok.xml
tika/trunk/tika-batch/src/test/resources/test-input/timeout_after_early_termination/
tika/trunk/tika-batch/src/test/resources/test-input/timeout_after_early_termination/test0_sleep.xml
tika/trunk/tika-batch/src/test/resources/test-input/wait_after_early_termination/
tika/trunk/tika-batch/src/test/resources/test-input/wait_after_early_termination/test0_sleep.xml
tika/trunk/tika-batch/src/test/resources/tika-batch-config-MockConsumersBuilder.xml
tika/trunk/tika-batch/src/test/resources/tika-batch-config-broken.xml
tika/trunk/tika-batch/src/test/resources/tika-batch-config-test.xml
Modified:
tika/trunk/CHANGES.txt
tika/trunk/pom.xml
tika/trunk/tika-app/pom.xml
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
Modified: tika/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/tika/trunk/CHANGES.txt?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/CHANGES.txt (original)
+++ tika/trunk/CHANGES.txt Mon Mar 23 16:09:10 2015
@@ -1,5 +1,8 @@
Release 1.8 - Current Development
+ * Added tika-batch module for directory to directory batch
+ processing (TIKA-1330).
+
* Translator.translate() Exceptions are now restricted to
TikaException and IOException (TIKA-1416).
Modified: tika/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/tika/trunk/pom.xml?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/pom.xml (original)
+++ tika/trunk/pom.xml Mon Mar 23 16:09:10 2015
@@ -50,6 +50,7 @@
<module>tika-parsers</module>
<module>tika-xmp</module>
<module>tika-serialization</module>
+ <module>tika-batch</module>
<module>tika-app</module>
<module>tika-bundle</module>
<module>tika-server</module>
Modified: tika/trunk/tika-app/pom.xml
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-app/pom.xml?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/tika-app/pom.xml (original)
+++ tika/trunk/tika-app/pom.xml Mon Mar 23 16:09:10 2015
@@ -54,6 +54,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-batch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
Added:
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java?rev=1668673&view=auto
==============================================================================
---
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
(added)
+++
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.cli;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This takes a TikaCLI commandline and builds the full commandline for
+ * org.apache.tika.batch.fs.FSBatchProcessCLI.
+ * <p>
+ * The "default" batch config file that this relies on
+ * if no batch config file is specified on the commandline
+ * is: tika-batch/src/main/resources/.../default-tika-batch-config.xml
+ */
+class BatchCommandLineBuilder {
+
+ static Pattern JVM_OPTS_PATTERN = Pattern.compile("^(--?)J(.+)");
+
+ protected static String[] build(String[] args) throws IOException {
+ Map<String, String> processArgs = new LinkedHashMap<String, String>();
+ Map<String, String> jvmOpts = new LinkedHashMap<String,String>();
+ //take the args, and divide them into process args and options for
+ //the parent jvm process (i.e. log files, etc)
+ mapifyArgs(args, processArgs, jvmOpts);
+
+ //now modify processArgs in place
+ translateCommandLine(args, processArgs);
+
+ //maybe the user specified a different classpath?!
+ if (! jvmOpts.containsKey("-cp") && !
jvmOpts.containsKey("--classpath")) {
+ String cp = System.getProperty("java.class.path");
+ //need to test for " " on *nix, can't just add double quotes
+ //across platforms.
+ if (cp.contains(" ")){
+ cp = "\""+cp+"\"";
+ }
+ jvmOpts.put("-cp", cp);
+ }
+
+ //now build the full command line
+ List<String> fullCommand = new ArrayList<String>();
+ fullCommand.add("java");
+ for (Map.Entry<String, String> e : jvmOpts.entrySet()) {
+ fullCommand.add(e.getKey());
+ if (e.getValue().length() > 0) {
+ fullCommand.add(e.getValue());
+ }
+ }
+ fullCommand.add("org.apache.tika.batch.fs.FSBatchProcessCLI");
+ //now add the process commands
+ for (Map.Entry<String, String> e : processArgs.entrySet()) {
+ fullCommand.add(e.getKey());
+ if (e.getValue().length() > 0) {
+ fullCommand.add(e.getValue());
+ }
+ }
+ return fullCommand.toArray(new String[fullCommand.size()]);
+ }
+
+
+ /**
+ * Take the input args and separate them into args that belong on the
commandline
+ * and those that belong as jvm args for the parent process.
+ * @param args -- literal args from TikaCLI commandline
+ * @param commandLine args that should be part of the batch commandline
+ * @param jvmArgs args that belong as jvm arguments for the parent process
+ */
+ private static void mapifyArgs(final String[] args,
+ final Map<String, String> commandLine,
+ final Map<String, String> jvmArgs) {
+
+ if (args.length == 0) {
+ return;
+ }
+
+ Matcher matcher = JVM_OPTS_PATTERN.matcher("");
+ for (int i = 0; i < args.length; i++) {
+ if (matcher.reset(args[i]).find()) {
+ String jvmArg = matcher.group(1)+matcher.group(2);
+ String v = "";
+ if (i < args.length-1 && ! args[i+1].startsWith("-")){
+ v = args[i+1];
+ i++;
+ }
+ jvmArgs.put(jvmArg, v);
+ } else if (args[i].startsWith("-")) {
+ String k = args[i];
+ String v = "";
+ if (i < args.length-1 && ! args[i+1].startsWith("-")){
+ v = args[i+1];
+ i++;
+ }
+ commandLine.put(k, v);
+ }
+ }
+ }
+
+ private static void translateCommandLine(String[] args, Map<String,
String> map) throws IOException {
+ //if there are only two args and they are both directories, treat the
first
+ //as input and the second as output.
+ if (args.length == 2 && !args[0].startsWith("-") && !
args[1].startsWith("-")) {
+ File candInput = new File(args[0]);
+ File candOutput = new File(args[1]);
+ if (candOutput.isFile()) {
+ throw new IllegalArgumentException("Can't specify an existing
file as the "+
+ "second argument for the output directory of a batch process");
+ }
+
+ if (candInput.isDirectory()){
+ map.put("-inputDir", args[0]);
+ map.put("-outputDir", args[1]);
+ }
+ }
+ //look for tikaConfig
+ for (String arg : args) {
+ if (arg.startsWith("--config=")) {
+ String configPath = arg.substring("--config=".length());
+ map.put("-c", configPath);
+ break;
+ }
+ }
+ //now translate output types
+ if (map.containsKey("-h") || map.containsKey("--html")) {
+ map.remove("-h");
+ map.remove("--html");
+ map.put("-basicHandlerType", "html");
+ map.put("-outputSuffix", "html");
+ } else if (map.containsKey("-x") || map.containsKey("--xml")) {
+ map.remove("-x");
+ map.remove("--xml");
+ map.put("-basicHandlerType", "xml");
+ map.put("-outputSuffix", "xml");
+ } else if (map.containsKey("-t") || map.containsKey("--text")) {
+ map.remove("-t");
+ map.remove("--text");
+ map.put("-basicHandlerType", "text");
+ map.put("-outputSuffix", "txt");
+ } else if (map.containsKey("-m") || map.containsKey("--metadata")) {
+ map.remove("-m");
+ map.remove("--metadata");
+ map.put("-basicHandlerType", "ignore");
+ map.put("-outputSuffix", "json");
+ } else if (map.containsKey("-T") || map.containsKey("--text-main")) {
+ map.remove("-T");
+ map.remove("--text-main");
+ map.put("-basicHandlerType", "body");
+ map.put("-outputSuffix", "txt");
+ }
+
+ if (map.containsKey("-J") || map.containsKey("--jsonRecursive")) {
+ map.remove("-J");
+ map.remove("--jsonRecursive");
+ map.put("-recursiveParserWrapper", "true");
+ //overwrite outputSuffix
+ map.put("-outputSuffix", "json");
+ }
+
+ if (map.containsKey("--inputDir") || map.containsKey("-i")) {
+ String v1 = map.remove("--inputDir");
+ String v2 = map.remove("-i");
+ String v = (v1 == null) ? v2 : v1;
+ map.put("-inputDir", v);
+ }
+
+ if (map.containsKey("--outputDir") || map.containsKey("-o")) {
+ String v1 = map.remove("--outputDir");
+ String v2 = map.remove("-o");
+ String v = (v1 == null) ? v2 : v1;
+ map.put("-outputDir", v);
+ }
+ }
+}
Modified: tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
(original)
+++ tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java Mon Mar
23 16:09:10 2015
@@ -64,6 +64,8 @@ import org.apache.poi.poifs.filesystem.D
import org.apache.poi.poifs.filesystem.DocumentInputStream;
import org.apache.poi.poifs.filesystem.POIFSFileSystem;
import org.apache.tika.Tika;
+import org.apache.tika.batch.BatchProcessDriverCLI;
+import org.apache.tika.batch.fs.FSBatchProcessCLI;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.detect.CompositeDetector;
import org.apache.tika.detect.DefaultDetector;
@@ -113,11 +115,23 @@ public class TikaCLI {
private static final Log logger = LogFactory.getLog(TikaCLI.class);
public static void main(String[] args) throws Exception {
+ TikaCLI cli = new TikaCLI();
+
+ if (cli.testForHelp(args)) {
+ FSBatchProcessCLI batchProcessCLI = new FSBatchProcessCLI(args);
+ cli.usage();
+ return;
+ } else if (cli.testForBatch(args)) {
+ String[] batchArgs = BatchCommandLineBuilder.build(args);
+ BatchProcessDriverCLI batchDriver = new
BatchProcessDriverCLI(batchArgs);
+ batchDriver.execute();
+ System.exit(0);
+ }
+
BasicConfigurator.configure(
new WriterAppender(new SimpleLayout(), System.err));
Logger.getRootLogger().setLevel(Level.INFO);
- TikaCLI cli = new TikaCLI();
if (args.length > 0) {
for (int i = 0; i < args.length; i++) {
cli.process(args[i]);
@@ -569,12 +583,68 @@ public class TikaCLI {
out.println(" Apache Tika server. The server will listen to the");
out.println(" ports you specify as one or more arguments.");
out.println();
+ out.println("- Batch mode");
+ out.println();
+ out.println(" Simplest method.");
+ out.println(" Specify two directories as args with no other args:");
+ out.println(" java -jar tika-app.jar <inputDirectory>
<outputDirectory");
+ out.println();
+ out.println("Batch Options:");
+ out.println(" -i or --inputDir Input directory");
+ out.println(" -o or --outputDir Output directory");
+ out.println(" -numConsumers Number of processing
threads");
+ out.println(" -bc Batch config file");
+ out.println(" -maxRestarts Maximum number of times
the ");
+ out.println(" watchdog process will
restart the child process.");
+ out.println(" -timeoutThresholdMillis Number of milliseconds
allowed to a parse");
+ out.println(" before the process is
killed and restarted");
+ out.println(" -fileList List of files to process,
with");
+ out.println(" paths relative to the
input directory");
+ out.println(" -includeFilePat Regular expression to
determine which");
+ out.println(" files to process, e.g.
\"(?i)\\.pdf\"");
+ out.println(" -excludeFilePat Regular expression to
determine which");
+ out.println(" files to avoid processing,
e.g. \"(?i)\\.pdf\"");
+ out.println(" -maxFileSizeBytes Skip files longer than
this value");
+ out.println();
+ out.println(" Control the type of output with -x, -h, -t and/or
-J.");
+ out.println();
+ out.println(" To modify child process jvm args, prepend \"J\" as
in:");
+ out.println(" -JXmx4g or -JDlog4j.configuration=file:log4j.xml.");
+
}
private void version() {
System.out.println(new Tika().toString());
}
+ private boolean testForHelp(String[] args) {
+ for (String s : args) {
+ if (s.equals("-?") || s.equals("--help")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean testForBatch(String[] args) {
+ if (args.length == 2 && ! args[0].startsWith("-")
+ && ! args[1].startsWith("-")) {
+ File inputCand = new File(args[0]);
+ File outputCand = new File(args[1]);
+ if (inputCand.isDirectory() && !outputCand.isFile()) {
+ return true;
+ }
+ }
+
+ for (String s : args) {
+ if (s.equals("-inputDir") || s.equals("--inputDir") ||
s.equals("-i")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
private void configure(String configFilePath) throws Exception {
this.configFilePath = configFilePath;
Added: tika/trunk/tika-app/src/main/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/main/resources/log4j.properties?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-app/src/main/resources/log4j.properties (added)
+++ tika/trunk/tika-app/src/main/resources/log4j.properties Mon Mar 23 16:09:10
2015
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#info,debug, error,fatal ...
+log4j.rootLogger=stdout
+
+#console
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+
+log4j.appender.stdout.layout.ConversionPattern=%m%n
Added:
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java?rev=1668673&view=auto
==============================================================================
---
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
(added)
+++
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.cli;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.io.IOUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TikaCLIBatchCommandLineTest {
+
+ File testInput = null;
+ File testFile = null;
+
+ @Before
+ public void init() {
+ testInput = new File("testInput");
+ if (!testInput.mkdirs()) {
+ throw new RuntimeException("Failed to open test input directory");
+ }
+ testFile = new File("testFile.txt");
+ OutputStream os = null;
+ try {
+ os = new FileOutputStream(testFile);
+ IOUtils.write("test output", os, "UTF-8");
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't open testFile");
+ } finally {
+ IOUtils.closeQuietly(os);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ FileUtils.deleteDirectory(testInput);
+ testFile.delete();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testJVMOpts() throws Exception {
+ String path = testInput.getAbsolutePath();
+ if (path.contains(" ")) {
+ path = "\"" + path + "\"";
+ }
+ String[] params = {"-JXmx1g",
"-JDlog4j.configuration=batch_process_log4j.xml", "-inputDir",
+ path, "-outputDir", "testout-output"};
+
+
+ String[] commandLine = BatchCommandLineBuilder.build(params);
+ StringBuilder sb = new StringBuilder();
+
+ for (String s : commandLine) {
+ sb.append(s).append(" ");
+ }
+ String s = sb.toString();
+ int classInd = s.indexOf("org.apache.tika.batch.fs.FSBatchProcessCLI");
+ int xmx = s.indexOf("-Xmx1g");
+ int inputDir = s.indexOf("-inputDir");
+ int log = s.indexOf("-Dlog4j.configuration");
+ assertTrue(classInd > -1);
+ assertTrue(xmx > -1);
+ assertTrue(inputDir > -1);
+ assertTrue(log > -1);
+ assertTrue(xmx < classInd);
+ assertTrue(log < classInd);
+ assertTrue(inputDir > classInd);
+ }
+
+ @Test
+ public void testBasicMappingOfArgs() throws Exception {
+ String path = testInput.getAbsolutePath();
+ if (path.contains(" ")) {
+ path = "\"" + path + "\"";
+ }
+ String[] params = {"-JXmx1g",
"-JDlog4j.configuration=batch_process_log4j.xml",
+ "-bc", "batch-config.xml",
+ "-J", "-h", "-inputDir", path};
+
+ String[] commandLine = BatchCommandLineBuilder.build(params);
+ Map<String, String> attrs = mapify(commandLine);
+ assertEquals("true", attrs.get("-recursiveParserWrapper"));
+ assertEquals("html", attrs.get("-basicHandlerType"));
+ assertEquals("json", attrs.get("-outputSuffix"));
+ assertEquals("batch-config.xml", attrs.get("-bc"));
+ assertEquals(path, attrs.get("-inputDir"));
+ }
+
+ @Test
+ public void testTwoDirsNoFlags() throws Exception {
+ String outputRoot = "outputRoot";
+ String path = testInput.getAbsolutePath();
+ if (path.contains(" ")) {
+ path = "\"" + path + "\"";
+ }
+ String[] params = {path, outputRoot};
+
+ String[] commandLine = BatchCommandLineBuilder.build(params);
+ Map<String, String> attrs = mapify(commandLine);
+ assertEquals(path, attrs.get("-inputDir"));
+ assertEquals(outputRoot, attrs.get("-outputDir"));
+ }
+
+ @Test
+ public void testTwoDirsVarious() throws Exception {
+ String outputRoot = "outputRoot";
+ String path = testInput.getAbsolutePath();
+ if (path.contains(" ")) {
+ path = "\"" + path + "\"";
+ }
+ String[] params = {"-i", path, "-o", outputRoot};
+
+ String[] commandLine = BatchCommandLineBuilder.build(params);
+ Map<String, String> attrs = mapify(commandLine);
+ assertEquals(path, attrs.get("-inputDir"));
+ assertEquals(outputRoot, attrs.get("-outputDir"));
+
+ params = new String[]{"--inputDir", path, "--outputDir", outputRoot};
+
+ commandLine = BatchCommandLineBuilder.build(params);
+ attrs = mapify(commandLine);
+ assertEquals(path, attrs.get("-inputDir"));
+ assertEquals(outputRoot, attrs.get("-outputDir"));
+
+ params = new String[]{"-inputDir", path, "-outputDir", outputRoot};
+
+ commandLine = BatchCommandLineBuilder.build(params);
+ attrs = mapify(commandLine);
+ assertEquals(path, attrs.get("-inputDir"));
+ assertEquals(outputRoot, attrs.get("-outputDir"));
+ }
+
+ @Test
+ public void testConfig() throws Exception {
+ String outputRoot = "outputRoot";
+ String configPath = "c:/somewhere/someConfig.xml";
+ String path = testInput.getAbsolutePath();
+
+ if (path.contains(" ")) {
+ path = "\"" + path + "\"";
+ }
+
+ String[] params = {"--inputDir", path, "--outputDir", outputRoot,
+ "--config="+configPath};
+ String[] commandLine = BatchCommandLineBuilder.build(params);
+ Map<String, String> attrs = mapify(commandLine);
+ assertEquals(path, attrs.get("-inputDir"));
+ assertEquals(outputRoot, attrs.get("-outputDir"));
+ assertEquals(configPath, attrs.get("-c"));
+
+ }
+
+ @Test
+ public void testOneDirOneFileException() throws Exception {
+ boolean ex = false;
+ try {
+ String outputRoot = "outputRoot";
+ String path = testInput.getAbsolutePath();
+ if (path.contains(" ")) {
+ path = "\"" + path + "\"";
+ }
+ String[] params = {path, testFile.getAbsolutePath()};
+
+ String[] commandLine = BatchCommandLineBuilder.build(params);
+
+ } catch (IllegalArgumentException e) {
+ ex = true;
+ }
+ assertTrue("exception on <dir> <file>", ex);
+ }
+
+ private Map<String, String> mapify(String[] args) {
+ Map<String, String> map = new LinkedHashMap<String, String>();
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].startsWith("-")) {
+ String k = args[i];
+ String v = "";
+ if (i < args.length - 1 && !args[i + 1].startsWith("-")) {
+ v = args[i + 1];
+ i++;
+ }
+ map.put(k, v);
+ }
+ }
+ return map;
+ }
+
+}
Added: tika/trunk/tika-batch/pom.xml
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/pom.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/pom.xml (added)
+++ tika/trunk/tika-batch/pom.xml Mon Mar 23 16:09:10 2015
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <properties>
+ <cli.version>1.2</cli.version> <!--sync version with tika-server or
move to parent? -->
+ </properties>
+
+ <parent>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-parent</artifactId>
+ <version>1.8-SNAPSHOT</version>
+ <relativePath>../tika-parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>tika-batch</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Tika batch</name>
+ <url>http://tika.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-parsers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-serialization</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-xmp</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>apache-log4j-extras</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>${cli.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-parsers</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ <version>2.1</version>
+ </dependency>
+
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>bundle</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <includes>
+ <include>**/*.xml</include>
+ </includes>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-DocURL>${project.url}</Bundle-DocURL>
+ <Bundle-Activator>
+ org.apache.tika.config.TikaActivator
+ </Bundle-Activator>
+ <Bundle-ActivationPolicy>lazy</Bundle-ActivationPolicy>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+
<exclude>src/test/resources/org/apache/tika/**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.10</version>
+ <configuration>
+ <additionalClasspathElements>
+ <additionalClasspathElement>
+
${project.build.directory}/${project.build.finalName}.jar
+ </additionalClasspathElement>
+ </additionalClasspathElements>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+
+ <organization>
+ <name>The Apache Software Foundation</name>
+ <url>http://www.apache.org</url>
+ </organization>
+ <scm>
+ <url>http://svn.apache.org/viewvc/tika/trunk/tika-batch</url>
+
<connection>scm:svn:http://svn.apache.org/repos/asf/tika/trunk/tika-batch</connection>
+
<developerConnection>scm:svn:https://svn.apache.org/repos/asf/tika/trunk/tika-batch</developerConnection>
+ </scm>
+ <issueManagement>
+ <system>JIRA</system>
+ <url>https://issues.apache.org/jira/browse/TIKA</url>
+ </issueManagement>
+ <ciManagement>
+ <system>Jenkins</system>
+ <url>https://builds.apache.org/job/Tika-trunk/</url>
+ </ciManagement>
+
+
+</project>
\ No newline at end of file
Added: tika/trunk/tika-batch/src/main/examples/batchExecutor.sh
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/examples/batchExecutor.sh?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/examples/batchExecutor.sh (added)
+++ tika/trunk/tika-batch/src/main/examples/batchExecutor.sh Mon Mar 23
16:09:10 2015
@@ -0,0 +1 @@
+java -Dlog4j.configuration=file:bin/log4j_driver.xml -cp "bin/*"
org.apache.tika.batch.BatchProcessDriverCLI java
-Dlog4j.configuration=file:bin/log4j.xml -cp "bin/*" -Xmx6g
org.apache.tika.batch.fs.FSBatchProcessCLI -c tika-batch-config-basic-test.xml
-outputDir output -inputDir input
Added: tika/trunk/tika-batch/src/main/examples/log4j.xml
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/examples/log4j.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/examples/log4j.xml (added)
+++ tika/trunk/tika-batch/src/main/examples/log4j.xml Mon Mar 23 16:09:10 2015
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="true">
+ <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- Pattern to output the caller's file name and line number -->
+ <!--<param name="ConversionPattern" value="%5p [%t] (%F:%L) - %m%n"/>-->
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="default.file" class="org.apache.log4j.FileAppender">
+ <param name="file" value="logs/tika-batch.log" />
+ <param name="append" value="true" />
+ <param name="threshold" value="info" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-4r %d [%t] %-5p %c %x -
%m%n" />
+ </layout>
+ </appender>
+
+ <appender name="pdfbox.file" class="org.apache.log4j.FileAppender">
+ <param name="file" value="logs/tika-batch-pdfbox.log" />
+ <param name="append" value="true" />
+ <param name="threshold" value="warn" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-4r %d [%t] %-5p %c %x -
%m%n" />
+ </layout>
+ </appender>
+
+ <logger name="org.apache.tika.batch.StatusReporter" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="stdout"/>
+ </logger>
+
+ <logger name="org.apache.pdfbox" additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="pdfbox.file"/>
+ </logger>
+
+
+ <logger name="org.apache.fontbox" additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="pdfbox.file"/>
+ </logger>
+
+
+ <root>
+ <priority value="info" />
+ <appender-ref ref="default.file" />
+ </root>
+
+
+</log4j:configuration>
+
Added: tika/trunk/tika-batch/src/main/examples/log4j_driver.xml
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/examples/log4j_driver.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/examples/log4j_driver.xml (added)
+++ tika/trunk/tika-batch/src/main/examples/log4j_driver.xml Mon Mar 23
16:09:10 2015
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="true">
+ <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- Pattern to output the caller's file name and line number -->
+ <!--<param name="ConversionPattern" value="%5p [%t] (%F:%L) - %m%n"/>-->
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="default.file" class="org.apache.log4j.FileAppender">
+ <param name="file" value="logs/tika-batch-driver.log" />
+ <param name="append" value="true" />
+ <param name="threshold" value="trace" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-4r %d [%t] %-5p %c %x -
%m%n" />
+ </layout>
+ </appender>
+
+<!--
+ <logger name="org.apache.tika.batch.FSBatchProcessDriverCLI"
additivity="true">
+ <level value="trace"/>
+ <appender-ref ref="stdout"/>
+ </logger>
+-->
+<!-- <logger name="org.apache.tika.batch.FSBatchProcessDriverCLI"
additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="default.file"/>
+ </logger>
+--> <root>
+ <priority value="trace" />
+ <appender-ref ref="stdout" />
+ </root>
+
+</log4j:configuration>
+
Added:
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java?rev=1668673&view=auto
==============================================================================
---
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
(added)
+++
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,34 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.Parser;
+
+/**
+ * Simple class for AutoDetectParser
+ */
+public class AutoDetectParserFactory implements ParserFactory {
+
+ @Override
+ public Parser getParser(TikaConfig config) {
+ return new AutoDetectParser(config);
+ }
+
+}
Added:
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java?rev=1668673&view=auto
==============================================================================
---
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
(added)
+++
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,33 @@
+package org.apache.tika.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * FileResourceConsumers should throw this if something
+ * catastrophic has happened and the BatchProcess should shutdown
+ * and not be restarted.
+ *
+ */
+public class BatchNoRestartError extends Error {
+
+ public BatchNoRestartError(Throwable t) {
+ super(t);
+ }
+ public BatchNoRestartError(String message) {
+ super(message);
+ }
+}
Added:
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
(added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,590 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tika.io.IOUtils;
+
+
+/**
+ * This is the main processor class for a single process.
+ * This class can only be run once.
+ * <p/>
+ * It requires a {@link FileResourceCrawler} and {@link
FileResourceConsumer}s, and it can also
+ * support a {@link StatusReporter} and an {@link Interrupter}.
+ * <p/>
+ * This is designed to shutdown if a parser has timed out or if there is
+ * an OutOfMemoryError. Consider using {@link BatchProcessDriverCLI}
+ * as a daemon/watchdog that monitors and can restart this batch process;
+ * <p>
+ * Note that this classs redirects stderr to stdout so that it can
+ * communicate without interference with the parent process on stderr.
+ */
+public class BatchProcess implements Callable<ParallelFileProcessingResult> {
+
+ public enum BATCH_CONSTANTS {
+ BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME,
+ BATCH_PROCESS_FATAL_MUST_RESTART
+ }
+
+ private enum CAUSE_FOR_TERMINATION {
+ COMPLETED_NORMALLY,
+ MAIN_LOOP_EXCEPTION_NO_RESTART,
+ CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART,
+ MAIN_LOOP_EXCEPTION,
+ CRAWLER_TIMED_OUT,
+ TIMED_OUT_CONSUMER,
+ USER_INTERRUPTION,
+ BATCH_PROCESS_ALIVE_TOO_LONG,
+ }
+
+ private static final Log logger;
+ static {
+ logger = LogFactory.getLog(BatchProcess.class);
+ }
+
+ private PrintStream outputStreamWriter;
+
+ // If a file hasn't been processed in this amount of time,
+ // report it to the console. When the directory crawler has stopped, the
thread will
+ // be terminated and the file name will be logged
+ private long timeoutThresholdMillis = 5 * 60 * 1000; // 5 minutes
+
+ private long timeoutCheckPulseMillis = 2 * 60 * 1000; //2 minutes
+ //if there was an early termination via the Interrupter
+ //or because of an uncaught runtime throwable, pause
+ //this long before shutting down to allow parsers to finish
+ private long pauseOnEarlyTerminationMillis = 30*1000; //30 seconds
+
+ private final long consumersManagerMaxMillis;
+
+ //maximum time that this process should stay alive
+ //to avoid potential memory leaks, not a bad idea to shutdown
+ //every hour or so.
+ private int maxAliveTimeSeconds = -1;
+
+ private final FileResourceCrawler fileResourceCrawler;
+
+ private final ConsumersManager consumersManager;
+
+ private final StatusReporter reporter;
+
+ private final Interrupter interrupter;
+
+ private final ArrayBlockingQueue<FileStarted> timedOuts;
+
+ private boolean alreadyExecuted = false;
+
+ public BatchProcess(FileResourceCrawler fileResourceCrawler,
+ ConsumersManager consumersManager,
+ StatusReporter reporter,
+ Interrupter interrupter) {
+ this.fileResourceCrawler = fileResourceCrawler;
+ this.consumersManager = consumersManager;
+ this.reporter = reporter;
+ this.interrupter = interrupter;
+ timedOuts = new
ArrayBlockingQueue<FileStarted>(consumersManager.getConsumers().size());
+ this.consumersManagerMaxMillis =
consumersManager.getConsumersManagerMaxMillis();
+ }
+
+ /**
+ * Runs main execution loop.
+ * <p>
+ * Redirects stdout to stderr to keep clean communications
+ * over stdout with parent process
+ * @return result of the processing
+ * @throws InterruptedException
+ */
+ public ParallelFileProcessingResult call()
+ throws InterruptedException {
+ if (alreadyExecuted) {
+ throw new IllegalStateException("Can only execute BatchRunner
once.");
+ }
+ //redirect streams
+ try {
+ outputStreamWriter = new PrintStream(System.err, true,
IOUtils.UTF_8.toString());
+ } catch (IOException e) {
+ throw new RuntimeException("Can't redirect streams");
+ }
+ System.setErr(System.out);
+
+ ParallelFileProcessingResult result = null;
+ try {
+ int numConsumers = consumersManager.getConsumers().size();
+ // fileResourceCrawler, statusReporter, the Interrupter,
timeoutChecker
+ int numNonConsumers = 4;
+
+ ExecutorService ex = Executors.newFixedThreadPool(numConsumers
+ + numNonConsumers);
+ CompletionService<IFileProcessorFutureResult> completionService =
+ new ExecutorCompletionService<IFileProcessorFutureResult>(
+ ex);
+ TimeoutChecker timeoutChecker = new TimeoutChecker();
+
+ try {
+ startConsumersManager();
+ } catch (BatchNoRestartError e) {
+ return new
+ ParallelFileProcessingResult(0, 0, 0,
+ 0, BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE,
+
CAUSE_FOR_TERMINATION.CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART.toString());
+
+ }
+
+ State state = mainLoop(completionService, timeoutChecker);
+ result = shutdown(ex, completionService, timeoutChecker, state);
+ } finally {
+ shutdownConsumersManager();
+ }
+ return result;
+ }
+
+
+ private State mainLoop(CompletionService<IFileProcessorFutureResult>
completionService,
+ TimeoutChecker timeoutChecker) {
+ alreadyExecuted = true;
+ State state = new State();
+ logger.info("BatchProcess starting up");
+
+
+ state.start = new Date().getTime();
+ completionService.submit(interrupter);
+ completionService.submit(fileResourceCrawler);
+ completionService.submit(reporter);
+ completionService.submit(timeoutChecker);
+
+
+ for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+ completionService.submit(consumer);
+ }
+
+ state.numConsumers = consumersManager.getConsumers().size();
+ CAUSE_FOR_TERMINATION causeForTermination = null;
+ //main processing loop
+ while (true) {
+ try {
+ Future<IFileProcessorFutureResult> futureResult =
+ completionService.poll(1, TimeUnit.SECONDS);
+
+ if (futureResult != null) {
+ state.removed++;
+ IFileProcessorFutureResult result = futureResult.get();
+ if (result instanceof FileConsumerFutureResult) {
+ state.consumersRemoved++;
+ state.processed += ((FileConsumerFutureResult)
result).getFilesProcessed();
+ } else if (result instanceof
FileResourceCrawlerFutureResult) {
+ state.crawlersRemoved++;
+ if (fileResourceCrawler.wasTimedOut()) {
+ causeForTermination =
CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT;
+ break;
+ }
+ } else if (result instanceof InterrupterFutureResult) {
+ causeForTermination =
CAUSE_FOR_TERMINATION.USER_INTERRUPTION;
+ break;
+ } else if (result instanceof TimeoutFutureResult) {
+ causeForTermination =
CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER;
+ break;
+ } //only thing left should be StatusReporterResult
+ }
+
+ if (state.consumersRemoved >= state.numConsumers) {
+ causeForTermination =
CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY;
+ break;
+ }
+ if (aliveTooLong(state.start)) {
+ causeForTermination =
CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG;
+ break;
+ }
+ } catch (Throwable e) {
+ if (isNonRestart(e)) {
+ causeForTermination =
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART;
+ } else {
+ causeForTermination =
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION;
+ }
+ logger.fatal("Main loop execution exception: " +
e.getMessage());
+ break;
+ }
+ }
+ state.causeForTermination = causeForTermination;
+ return state;
+ }
+
+ private ParallelFileProcessingResult shutdown(ExecutorService ex,
+ CompletionService<IFileProcessorFutureResult> completionService,
+ TimeoutChecker timeoutChecker, State state) {
+
+ reporter.setIsShuttingDown(true);
+ int added = fileResourceCrawler.getAdded();
+ int considered = fileResourceCrawler.getConsidered();
+
+ //TODO: figure out safe way to shutdown resource crawler
+ //if it isn't. Does it need to add poison at this point?
+ //fileResourceCrawler.pleaseShutdown();
+
+ //Step 1: prevent uncalled threads from being started
+ ex.shutdown();
+
+ //Step 2: ask consumers to shutdown politely.
+ //Under normal circumstances, they should all have completed by now.
+ for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+ consumer.pleaseShutdown();
+ }
+ //The resourceCrawler should shutdown now. No need for poison.
+ fileResourceCrawler.shutDownNoPoison();
+ //if there are any active/asked to shutdown consumers, await
termination
+ //this can happen if a user interrupts the process
+ //of if the crawler stops early, or ...
+ politelyAwaitTermination(state.causeForTermination);
+
+ //Step 3: Gloves come off. We've tried to ask kindly before.
+ //Now it is time shut down. This will corrupt
+ //nio channels via thread interrupts! Hopefully, everything
+ //has shut down by now.
+ logger.trace("About to shutdownNow()");
+ List<Runnable> neverCalled = ex.shutdownNow();
+ logger.trace("TERMINATED " + ex.isTerminated() + " : "
+ + state.consumersRemoved + " : " + state.crawlersRemoved);
+
+ int end = state.numConsumers + state.numNonConsumers - state.removed -
neverCalled.size();
+
+ for (int t = 0; t < end; t++) {
+ Future<IFileProcessorFutureResult> future = null;
+ try {
+ future = completionService.poll(10, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("thread interrupt while polling in final shutdown
loop");
+ break;
+ }
+ logger.trace("In while future==null loop in final shutdown loop");
+ if (future == null) {
+ break;
+ }
+ try {
+ IFileProcessorFutureResult result = future.get();
+ if (result instanceof FileConsumerFutureResult) {
+ FileConsumerFutureResult consumerResult =
(FileConsumerFutureResult) result;
+ state.processed += consumerResult.getFilesProcessed();
+ FileStarted fileStarted = consumerResult.getFileStarted();
+ if (fileStarted != null
+ && fileStarted.getElapsedMillis() >
timeoutThresholdMillis) {
+ logger.warn(fileStarted.getResourceId()
+ + "\t caused a file processor to hang or
crash. You may need to remove "
+ + "this file from your input set and rerun.");
+ }
+ } else if (result instanceof FileResourceCrawlerFutureResult) {
+ FileResourceCrawlerFutureResult crawlerResult =
(FileResourceCrawlerFutureResult) result;
+ considered += crawlerResult.getConsidered();
+ added += crawlerResult.getAdded();
+ } //else ...we don't care about anything else stopping at this
point
+ } catch (ExecutionException e) {
+ logger.error("Execution exception trying to shutdown after
shutdownNow:" + e.getMessage());
+ } catch (InterruptedException e) {
+ logger.error("Interrupted exception trying to shutdown after
shutdownNow:" + e.getMessage());
+ }
+ }
+ //do we need to restart?
+ String restartMsg = null;
+ if (state.causeForTermination ==
CAUSE_FOR_TERMINATION.USER_INTERRUPTION
+ || state.causeForTermination ==
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
+ //do not restart!!!
+ } else if (state.causeForTermination ==
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION) {
+ restartMsg = "Uncaught consumer throwable";
+ } else if (state.causeForTermination ==
CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER) {
+ if (areResourcesPotentiallyRemaining()) {
+ restartMsg = "Consumer timed out with resources remaining";
+ }
+ } else if (state.causeForTermination ==
CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG) {
+ restartMsg =
BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString();
+ } else if (state.causeForTermination ==
CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT) {
+ restartMsg = "Crawler timed out.";
+ } else if (fileResourceCrawler.wasTimedOut()) {
+ restartMsg = "Crawler was timed out.";
+ } else if (fileResourceCrawler.isActive()) {
+ restartMsg = "Crawler is still active.";
+ } else if (! fileResourceCrawler.isQueueEmpty()) {
+ restartMsg = "Resources still exist for processing";
+ }
+
+ int exitStatus = getExitStatus(state.causeForTermination, restartMsg);
+
+ //need to re-check, report, mark timed out consumers
+ timeoutChecker.checkForTimedOutConsumers();
+
+ for (FileStarted fs : timedOuts) {
+ logger.fatal("A parser was still working on >" +
fs.getResourceId() +
+ "< for " + fs.getElapsedMillis() + " milliseconds after it
started." +
+ " This exceeds the maxTimeoutMillis parameter");
+ }
+ double elapsed = ((double) new Date().getTime() - (double)
state.start) / 1000.0;
+ return new
+ ParallelFileProcessingResult(considered, added, state.processed,
+ elapsed, exitStatus, state.causeForTermination.toString());
+ }
+
+ private class State {
+ long start = -1;
+ int processed = 0;
+ int numConsumers = 0;
+ int numNonConsumers = 0;
+ int removed = 0;
+ int consumersRemoved = 0;
+ int crawlersRemoved = 0;
+ CAUSE_FOR_TERMINATION causeForTermination = null;
+ }
+
+ private void startConsumersManager() {
+ if (consumersManagerMaxMillis < 0) {
+ consumersManager.init();
+ return;
+ }
+ Thread timed = new Thread() {
+ public void run() {
+ logger.trace("about to start consumers manager");
+ consumersManager.init();
+ logger.trace("finished starting consumers manager");
+ }
+ };
+ //don't allow this thread to keep process alive
+ timed.setDaemon(true);
+ timed.start();
+ try {
+ timed.join(consumersManagerMaxMillis);
+ } catch (InterruptedException e) {
+ logger.warn("interruption exception during consumers manager
shutdown");
+ }
+ if (timed.isAlive()) {
+ logger.fatal("ConsumersManager did not start within " +
consumersManagerMaxMillis + "ms");
+ throw new BatchNoRestartError("ConsumersManager did not start
within "+consumersManagerMaxMillis+"ms");
+ }
+ }
+
+ private void shutdownConsumersManager() {
+ if (consumersManagerMaxMillis < 0) {
+ consumersManager.shutdown();
+ return;
+ }
+ Thread timed = new Thread() {
+ public void run() {
+ logger.trace("starting to shutdown consumers manager");
+ consumersManager.shutdown();
+ logger.trace("finished shutting down consumers manager");
+ }
+ };
+ timed.setDaemon(true);
+ timed.start();
+ try {
+ timed.join(consumersManagerMaxMillis);
+ } catch (InterruptedException e) {
+ logger.warn("interruption exception during consumers manager
shutdown");
+ }
+ if (timed.isAlive()) {
+ logger.error("ConsumersManager was still alive during shutdown!");
+ throw new BatchNoRestartError("ConsumersManager did not shutdown
within: "+
+ consumersManagerMaxMillis+"ms");
+ }
+ }
+
+ /**
+ * This is used instead of awaitTermination(), because that interrupts
+ * the thread and then waits for its termination. This politely waits.
+ *
+ * @param causeForTermination reason for termination.
+ */
+ private void politelyAwaitTermination(CAUSE_FOR_TERMINATION
causeForTermination) {
+ if (causeForTermination == CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY) {
+ return;
+ }
+ long start = new Date().getTime();
+ while (countActiveConsumers() > 0) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.warn("Thread interrupted while trying to
politelyAwaitTermination");
+ return;
+ }
+ long elapsed = new Date().getTime()-start;
+ if (pauseOnEarlyTerminationMillis > -1 &&
+ elapsed > pauseOnEarlyTerminationMillis) {
+ logger.warn("I waited after an early termination for "+
+ elapsed + ", but there was at least one active consumer");
+ return;
+ }
+ }
+ }
+
+ private boolean isNonRestart(Throwable e) {
+ if (e instanceof BatchNoRestartError) {
+ return true;
+ }
+ Throwable cause = e.getCause();
+ return cause != null && isNonRestart(cause);
+ }
+
+ private int getExitStatus(CAUSE_FOR_TERMINATION causeForTermination,
String restartMsg) {
+ if (causeForTermination ==
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
+ logger.info(CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART);
+ return BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE;
+ }
+
+ if (restartMsg != null) {
+ if
(restartMsg.equals(BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString()))
{
+ logger.warn(restartMsg);
+ } else {
+ logger.fatal(restartMsg);
+ }
+
+ //send over stdout wrapped in outputStreamWriter
+ outputStreamWriter.println(
+
BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString() +
+ " >> " + restartMsg);
+ outputStreamWriter.flush();
+ return BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE;
+ }
+ return 0;
+ }
+
+ //could new FileResources be consumed from the Queue?
+ //Because of race conditions, this can return a true
+ //when the real answer is false.
+ //This should never return false, though, if the answer is true!
+ private boolean areResourcesPotentiallyRemaining() {
+ if (fileResourceCrawler.isActive()) {
+ return true;
+ }
+ return !fileResourceCrawler.isQueueEmpty();
+ }
+
+ private boolean aliveTooLong(long started) {
+ if (maxAliveTimeSeconds < 0) {
+ return false;
+ }
+ double elapsedSeconds = (double) (new Date().getTime() - started) /
(double) 1000;
+ return elapsedSeconds > (double) maxAliveTimeSeconds;
+ }
+
+ //snapshot of non-retired consumers; actual number may be smaller by the
time
+ //this returns a value!
+ private int countActiveConsumers() {
+ int active = 0;
+ for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+ if (consumer.isStillActive()) {
+ active++;
+ }
+ }
+ return active;
+ }
+
+ /**
+ * If there is an early termination via an interrupt or too many timed out
consumers
+ * or because a consumer or other Runnable threw a Throwable, pause this
long
+ * before killing the consumers and other threads.
+ *
+ * Typically makes sense for this to be the same or slightly larger than
+ * timeoutThresholdMillis
+ *
+ * @param pauseOnEarlyTerminationMillis how long to pause if there is an
early termination
+ */
+ public void setPauseOnEarlyTerminationMillis(long
pauseOnEarlyTerminationMillis) {
+ this.pauseOnEarlyTerminationMillis = pauseOnEarlyTerminationMillis;
+ }
+
+ /**
+ * The amount of time allowed before a consumer should be timed out.
+ *
+ * @param timeoutThresholdMillis threshold in milliseconds before
declaring a consumer timed out
+ */
+ public void setTimeoutThresholdMillis(long timeoutThresholdMillis) {
+ this.timeoutThresholdMillis = timeoutThresholdMillis;
+ }
+
+ public void setTimeoutCheckPulseMillis(long timeoutCheckPulseMillis) {
+ this.timeoutCheckPulseMillis = timeoutCheckPulseMillis;
+ }
+
+ /**
+ * The maximum amount of time that this process can be alive. To avoid
+ * memory leaks, it is sometimes beneficial to shutdown (and restart) the
+ * process periodically.
+ * <p/>
+ * If the value is < 0, the process will run until completion,
interruption or exception.
+ *
+ * @param maxAliveTimeSeconds maximum amount of time in seconds to remain
alive
+ */
+ public void setMaxAliveTimeSeconds(int maxAliveTimeSeconds) {
+ this.maxAliveTimeSeconds = maxAliveTimeSeconds;
+ }
+
+ private class TimeoutChecker implements
Callable<IFileProcessorFutureResult> {
+
+ @Override
+ public TimeoutFutureResult call() throws Exception {
+ while (timedOuts.size() == 0) {
+ try {
+ Thread.sleep(timeoutCheckPulseMillis);
+ } catch (InterruptedException e) {
+ logger.debug("Thread interrupted exception in
TimeoutChecker");
+ break;
+ //just stop.
+ }
+ checkForTimedOutConsumers();
+ if (countActiveConsumers() == 0) {
+ logger.info("No activeConsumers in TimeoutChecker");
+ break;
+ }
+ }
+ logger.debug("TimeoutChecker quitting: " + timedOuts.size());
+ return new TimeoutFutureResult(timedOuts.size());
+ }
+
+ private void checkForTimedOutConsumers() {
+ for (FileResourceConsumer consumer :
consumersManager.getConsumers()) {
+ FileStarted fs =
consumer.checkForTimedOutMillis(timeoutThresholdMillis);
+ if (fs != null) {
+ timedOuts.add(fs);
+ }
+ }
+ }
+ }
+
+ private class TimeoutFutureResult implements IFileProcessorFutureResult {
+ private final int timedOutCount;
+
+ private TimeoutFutureResult(final int timedOutCount) {
+ this.timedOutCount = timedOutCount;
+ }
+
+ protected int getTimedOutCount() {
+ return timedOutCount;
+ }
+ }
+}