Author: tallison
Date: Mon Mar 23 16:09:10 2015
New Revision: 1668673

URL: http://svn.apache.org/r1668673
Log:
initial commit of TIKA-1330

Added:
    
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
    tika/trunk/tika-app/src/main/resources/log4j.properties
    
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
    tika/trunk/tika-batch/
    tika/trunk/tika-batch/pom.xml
    tika/trunk/tika-batch/src/
    tika/trunk/tika-batch/src/main/
    tika/trunk/tika-batch/src/main/examples/
    tika/trunk/tika-batch/src/main/examples/batchExecutor.sh
    tika/trunk/tika-batch/src/main/examples/log4j.xml
    tika/trunk/tika-batch/src/main/examples/log4j_driver.xml
    tika/trunk/tika-batch/src/main/java/
    tika/trunk/tika-batch/src/main/java/org/
    tika/trunk/tika-batch/src/main/java/org/apache/
    tika/trunk/tika-batch/src/main/java/org/apache/tika/
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/CommandLineParserBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/DefaultContentHandlerFactoryBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/IContentHandlerFactoryBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ICrawlerBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMAndQueueBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ReporterBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/SimpleLogReporterBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/StatusReporterBuilder.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/AbstractFSConsumer.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/BasicTikaFSConsumer.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSBatchProcessCLI.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSConsumersManager.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDirectoryCrawler.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDocumentSelector.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSFileResource.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSListCrawler.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSOutputStreamFactory.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSProperties.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSUtil.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/util/
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java
    
tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java
    tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java
    tika/trunk/tika-batch/src/main/java/overview.html
    tika/trunk/tika-batch/src/main/resources/
    tika/trunk/tika-batch/src/main/resources/org/
    tika/trunk/tika-batch/src/main/resources/org/apache/
    tika/trunk/tika-batch/src/main/resources/org/apache/tika/
    tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/
    tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/
    
tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
    tika/trunk/tika-batch/src/test/
    tika/trunk/tika-batch/src/test/java/
    tika/trunk/tika-batch/src/test/java/org/
    tika/trunk/tika-batch/src/test/java/org/apache/
    tika/trunk/tika-batch/src/test/java/org/apache/tika/
    tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java
    tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/FSBatchTestBase.java
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/HandlerBuilderTest.java
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/OutputStreamFactoryTest.java
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/StringStreamGobbler.java
    tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/strawman/
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/strawman/StrawmanTest.java
    tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/mock/
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/mock/MockConsumersBuilder.java
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/mock/MockConsumersManager.java
    tika/trunk/tika-batch/src/test/java/org/apache/tika/parser/
    tika/trunk/tika-batch/src/test/java/org/apache/tika/parser/mock/
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/parser/mock/MockParserFactory.java
    tika/trunk/tika-batch/src/test/java/org/apache/tika/util/
    
tika/trunk/tika-batch/src/test/java/org/apache/tika/util/TikaExceptionFilterTest.java
    tika/trunk/tika-batch/src/test/resources/
    tika/trunk/tika-batch/src/test/resources/log4j.properties
    tika/trunk/tika-batch/src/test/resources/log4j_process.properties
    tika/trunk/tika-batch/src/test/resources/test-documents/
    tika/trunk/tika-batch/src/test/resources/test-documents/null_pointer.xml
    tika/trunk/tika-batch/src/test/resources/test-input/
    tika/trunk/tika-batch/src/test/resources/test-input/basic/
    tika/trunk/tika-batch/src/test/resources/test-input/basic/test0.xml
    tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/
    
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test0_heavy_hang.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test1_heavy_hang.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test2_heavy_hang.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test3_heavy_hang.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test4_heavy_hang.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test5_heavy_hang.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/heavy_heavy_hangs/test6_ok.xml
    tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/
    
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test0_oom.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test1_oom.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test2_oom.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/max_restarts/test3_ok.xml
    tika/trunk/tika-batch/src/test/resources/test-input/no_restart/
    tika/trunk/tika-batch/src/test/resources/test-input/no_restart/test1_ok.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/no_restart/test2_norestart.xml
    tika/trunk/tika-batch/src/test/resources/test-input/no_restart/test3_ok.xml
    tika/trunk/tika-batch/src/test/resources/test-input/noisy_parsers/
    tika/trunk/tika-batch/src/test/resources/test-input/noisy_parsers/test0.xml
    tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/
    
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test0_heavy_hang.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test1_ok.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test2_ok.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test3_ok.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/one_heavy_hang/test4_ok.xml
    tika/trunk/tika-batch/src/test/resources/test-input/oom/
    tika/trunk/tika-batch/src/test/resources/test-input/oom/test0_sleep.xml
    tika/trunk/tika-batch/src/test/resources/test-input/oom/test1_heavy_hang.xml
    tika/trunk/tika-batch/src/test/resources/test-input/oom/test2_ok.xml
    tika/trunk/tika-batch/src/test/resources/test-input/oom/test3_oom.xml
    tika/trunk/tika-batch/src/test/resources/test-input/oom/test4_ok.xml
    tika/trunk/tika-batch/src/test/resources/test-input/oom/test5_ok.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/timeout_after_early_termination/
    
tika/trunk/tika-batch/src/test/resources/test-input/timeout_after_early_termination/test0_sleep.xml
    
tika/trunk/tika-batch/src/test/resources/test-input/wait_after_early_termination/
    
tika/trunk/tika-batch/src/test/resources/test-input/wait_after_early_termination/test0_sleep.xml
    
tika/trunk/tika-batch/src/test/resources/tika-batch-config-MockConsumersBuilder.xml
    tika/trunk/tika-batch/src/test/resources/tika-batch-config-broken.xml
    tika/trunk/tika-batch/src/test/resources/tika-batch-config-test.xml
Modified:
    tika/trunk/CHANGES.txt
    tika/trunk/pom.xml
    tika/trunk/tika-app/pom.xml
    tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java

Modified: tika/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/tika/trunk/CHANGES.txt?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/CHANGES.txt (original)
+++ tika/trunk/CHANGES.txt Mon Mar 23 16:09:10 2015
@@ -1,5 +1,8 @@
 Release 1.8 - Current Development
 
+  * Added tika-batch module for directory to directory batch
+    processing (TIKA-1330).
+
   * Translator.translate() Exceptions are now restricted to
     TikaException and IOException (TIKA-1416).
 

Modified: tika/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/tika/trunk/pom.xml?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/pom.xml (original)
+++ tika/trunk/pom.xml Mon Mar 23 16:09:10 2015
@@ -50,6 +50,7 @@
     <module>tika-parsers</module>
     <module>tika-xmp</module>
     <module>tika-serialization</module>
+    <module>tika-batch</module>
     <module>tika-app</module>
     <module>tika-bundle</module>
     <module>tika-server</module>

Modified: tika/trunk/tika-app/pom.xml
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-app/pom.xml?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/tika-app/pom.xml (original)
+++ tika/trunk/tika-app/pom.xml Mon Mar 23 16:09:10 2015
@@ -54,6 +54,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-batch</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
     </dependency>

Added: 
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
 (added)
+++ 
tika/trunk/tika-app/src/main/java/org/apache/tika/cli/BatchCommandLineBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.cli;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This takes a TikaCLI commandline and builds the full commandline for
+ * org.apache.tika.batch.fs.FSBatchProcessCLI.
+ * <p>
+ * The "default" batch config file that this relies on
+ * if no batch config file is specified on the commandline
+ * is: tika-batch/src/main/resources/.../default-tika-batch-config.xml
+ */
+class BatchCommandLineBuilder {
+
+    static Pattern JVM_OPTS_PATTERN = Pattern.compile("^(--?)J(.+)");
+
+    protected static String[] build(String[] args) throws IOException {
+        Map<String, String> processArgs = new LinkedHashMap<String, String>();
+        Map<String, String> jvmOpts = new LinkedHashMap<String,String>();
+        //take the args, and divide them into process args and options for
+        //the parent jvm process (i.e. log files, etc)
+        mapifyArgs(args, processArgs, jvmOpts);
+
+        //now modify processArgs in place
+        translateCommandLine(args, processArgs);
+
+        //maybe the user specified a different classpath?!
+        if (! jvmOpts.containsKey("-cp") && ! 
jvmOpts.containsKey("--classpath")) {
+            String cp = System.getProperty("java.class.path");
+            //need to test for " " on *nix, can't just add double quotes
+            //across platforms.
+            if (cp.contains(" ")){
+                cp = "\""+cp+"\"";
+            }
+            jvmOpts.put("-cp", cp);
+        }
+
+        //now build the full command line
+        List<String> fullCommand = new ArrayList<String>();
+        fullCommand.add("java");
+        for (Map.Entry<String, String> e : jvmOpts.entrySet()) {
+            fullCommand.add(e.getKey());
+            if (e.getValue().length() > 0) {
+                fullCommand.add(e.getValue());
+            }
+        }
+        fullCommand.add("org.apache.tika.batch.fs.FSBatchProcessCLI");
+        //now add the process commands
+        for (Map.Entry<String, String> e : processArgs.entrySet()) {
+            fullCommand.add(e.getKey());
+            if (e.getValue().length() > 0) {
+                fullCommand.add(e.getValue());
+            }
+        }
+        return fullCommand.toArray(new String[fullCommand.size()]);
+    }
+
+
+    /**
+     * Take the input args and separate them into args that belong on the 
commandline
+     * and those that belong as jvm args for the parent process.
+     * @param args -- literal args from TikaCLI commandline
+     * @param commandLine args that should be part of the batch commandline
+     * @param jvmArgs args that belong as jvm arguments for the parent process
+     */
+    private static void mapifyArgs(final String[] args,
+                                   final Map<String, String> commandLine,
+                                   final Map<String, String> jvmArgs) {
+
+        if (args.length == 0) {
+            return;
+        }
+
+        Matcher matcher = JVM_OPTS_PATTERN.matcher("");
+        for (int i = 0; i < args.length; i++) {
+            if (matcher.reset(args[i]).find()) {
+                String jvmArg = matcher.group(1)+matcher.group(2);
+                String v = "";
+                if (i < args.length-1 && ! args[i+1].startsWith("-")){
+                    v = args[i+1];
+                    i++;
+                }
+                jvmArgs.put(jvmArg, v);
+            } else if (args[i].startsWith("-")) {
+                String k = args[i];
+                String v = "";
+                if (i < args.length-1 && ! args[i+1].startsWith("-")){
+                    v = args[i+1];
+                    i++;
+                }
+                commandLine.put(k, v);
+            }
+        }
+    }
+
+    private static void translateCommandLine(String[] args, Map<String, 
String> map) throws IOException {
+        //if there are only two args and they are both directories, treat the 
first
+        //as input and the second as output.
+        if (args.length == 2 && !args[0].startsWith("-") && ! 
args[1].startsWith("-")) {
+            File candInput = new File(args[0]);
+            File candOutput = new File(args[1]);
+            if (candOutput.isFile()) {
+                throw new IllegalArgumentException("Can't specify an existing 
file as the "+
+                "second argument for the output directory of a batch process");
+            }
+
+            if (candInput.isDirectory()){
+                map.put("-inputDir", args[0]);
+                map.put("-outputDir", args[1]);
+            }
+        }
+        //look for tikaConfig
+        for (String arg : args) {
+            if (arg.startsWith("--config=")) {
+                String configPath = arg.substring("--config=".length());
+                map.put("-c", configPath);
+                break;
+            }
+        }
+        //now translate output types
+        if (map.containsKey("-h") || map.containsKey("--html")) {
+            map.remove("-h");
+            map.remove("--html");
+            map.put("-basicHandlerType", "html");
+            map.put("-outputSuffix", "html");
+        } else if (map.containsKey("-x") || map.containsKey("--xml")) {
+            map.remove("-x");
+            map.remove("--xml");
+            map.put("-basicHandlerType", "xml");
+            map.put("-outputSuffix", "xml");
+        } else if (map.containsKey("-t") || map.containsKey("--text")) {
+            map.remove("-t");
+            map.remove("--text");
+            map.put("-basicHandlerType", "text");
+            map.put("-outputSuffix", "txt");
+        } else if (map.containsKey("-m") || map.containsKey("--metadata")) {
+            map.remove("-m");
+            map.remove("--metadata");
+            map.put("-basicHandlerType", "ignore");
+            map.put("-outputSuffix", "json");
+        } else if (map.containsKey("-T") || map.containsKey("--text-main")) {
+            map.remove("-T");
+            map.remove("--text-main");
+            map.put("-basicHandlerType", "body");
+            map.put("-outputSuffix", "txt");
+        }
+
+        if (map.containsKey("-J") || map.containsKey("--jsonRecursive")) {
+            map.remove("-J");
+            map.remove("--jsonRecursive");
+            map.put("-recursiveParserWrapper", "true");
+            //overwrite outputSuffix
+            map.put("-outputSuffix", "json");
+        }
+
+        if (map.containsKey("--inputDir") || map.containsKey("-i")) {
+            String v1 = map.remove("--inputDir");
+            String v2 = map.remove("-i");
+            String v = (v1 == null) ? v2 : v1;
+            map.put("-inputDir", v);
+        }
+
+        if (map.containsKey("--outputDir") || map.containsKey("-o")) {
+            String v1 = map.remove("--outputDir");
+            String v2 = map.remove("-o");
+            String v = (v1 == null) ? v2 : v1;
+            map.put("-outputDir", v);
+        }
+    }
+}

Modified: tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java?rev=1668673&r1=1668672&r2=1668673&view=diff
==============================================================================
--- tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java 
(original)
+++ tika/trunk/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java Mon Mar 
23 16:09:10 2015
@@ -64,6 +64,8 @@ import org.apache.poi.poifs.filesystem.D
 import org.apache.poi.poifs.filesystem.DocumentInputStream;
 import org.apache.poi.poifs.filesystem.POIFSFileSystem;
 import org.apache.tika.Tika;
+import org.apache.tika.batch.BatchProcessDriverCLI;
+import org.apache.tika.batch.fs.FSBatchProcessCLI;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.detect.CompositeDetector;
 import org.apache.tika.detect.DefaultDetector;
@@ -113,11 +115,23 @@ public class TikaCLI {
     private static final Log logger = LogFactory.getLog(TikaCLI.class);
 
     public static void main(String[] args) throws Exception {
+        TikaCLI cli = new TikaCLI();
+
+        if (cli.testForHelp(args)) {
+            FSBatchProcessCLI batchProcessCLI = new FSBatchProcessCLI(args);
+            cli.usage();
+            return;
+        } else if (cli.testForBatch(args)) {
+            String[] batchArgs = BatchCommandLineBuilder.build(args);
+            BatchProcessDriverCLI batchDriver = new 
BatchProcessDriverCLI(batchArgs);
+            batchDriver.execute();
+            System.exit(0);
+        }
+
         BasicConfigurator.configure(
                 new WriterAppender(new SimpleLayout(), System.err));
         Logger.getRootLogger().setLevel(Level.INFO);
 
-        TikaCLI cli = new TikaCLI();
         if (args.length > 0) {
             for (int i = 0; i < args.length; i++) {
                 cli.process(args[i]);
@@ -569,12 +583,68 @@ public class TikaCLI {
         out.println("    Apache Tika server. The server will listen to the");
         out.println("    ports you specify as one or more arguments.");
         out.println();
+        out.println("- Batch mode");
+        out.println();
+        out.println("    Simplest method.");
+        out.println("    Specify two directories as args with no other args:");
+        out.println("         java -jar tika-app.jar <inputDirectory> 
<outputDirectory");
+        out.println();
+        out.println("Batch Options:");
+        out.println("    -i  or --inputDir          Input directory");
+        out.println("    -o  or --outputDir         Output directory");
+        out.println("    -numConsumers              Number of processing 
threads");
+        out.println("    -bc                        Batch config file");
+        out.println("    -maxRestarts               Maximum number of times 
the ");
+        out.println("                               watchdog process will 
restart the child process.");
+        out.println("    -timeoutThresholdMillis    Number of milliseconds 
allowed to a parse");
+        out.println("                               before the process is 
killed and restarted");
+        out.println("    -fileList                  List of files to process, 
with");
+        out.println("                               paths relative to the 
input directory");
+        out.println("    -includeFilePat            Regular expression to 
determine which");
+        out.println("                               files to process, e.g. 
\"(?i)\\.pdf\"");
+        out.println("    -excludeFilePat            Regular expression to 
determine which");
+        out.println("                               files to avoid processing, 
e.g. \"(?i)\\.pdf\"");
+        out.println("    -maxFileSizeBytes          Skip files longer than 
this value");
+        out.println();
+        out.println("    Control the type of output with -x, -h, -t and/or 
-J.");
+        out.println();
+        out.println("    To modify child process jvm args, prepend \"J\" as 
in:");
+        out.println("    -JXmx4g or -JDlog4j.configuration=file:log4j.xml.");
+
     }
 
     private void version() {
         System.out.println(new Tika().toString());
     }
 
+    private boolean testForHelp(String[] args) {
+        for (String s : args) {
+            if (s.equals("-?") || s.equals("--help")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean testForBatch(String[] args) {
+        if (args.length == 2 && ! args[0].startsWith("-")
+                && ! args[1].startsWith("-")) {
+            File inputCand = new File(args[0]);
+            File outputCand = new File(args[1]);
+            if (inputCand.isDirectory() && !outputCand.isFile()) {
+                return true;
+            }
+        }
+
+        for (String s : args) {
+            if (s.equals("-inputDir") || s.equals("--inputDir") || 
s.equals("-i")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
 
     private void configure(String configFilePath) throws Exception {
         this.configFilePath = configFilePath;

Added: tika/trunk/tika-app/src/main/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/main/resources/log4j.properties?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-app/src/main/resources/log4j.properties (added)
+++ tika/trunk/tika-app/src/main/resources/log4j.properties Mon Mar 23 16:09:10 
2015
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#info,debug, error,fatal ...
+log4j.rootLogger=stdout
+
+#console
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+
+log4j.appender.stdout.layout.ConversionPattern=%m%n

Added: 
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
 (added)
+++ 
tika/trunk/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchCommandLineTest.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.cli;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.io.IOUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TikaCLIBatchCommandLineTest {
+
+    File testInput = null;
+    File testFile = null;
+
+    @Before
+    public void init() {
+        testInput = new File("testInput");
+        if (!testInput.mkdirs()) {
+            throw new RuntimeException("Failed to open test input directory");
+        }
+        testFile = new File("testFile.txt");
+        OutputStream os = null;
+        try {
+            os = new FileOutputStream(testFile);
+            IOUtils.write("test output", os, "UTF-8");
+        } catch (IOException e) {
+            throw new RuntimeException("Couldn't open testFile");
+        } finally {
+            IOUtils.closeQuietly(os);
+        }
+    }
+
+    @After
+    public void tearDown() {
+        try {
+            FileUtils.deleteDirectory(testInput);
+            testFile.delete();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testJVMOpts() throws Exception {
+        String path = testInput.getAbsolutePath();
+        if (path.contains(" ")) {
+            path = "\"" + path + "\"";
+        }
+        String[] params = {"-JXmx1g", 
"-JDlog4j.configuration=batch_process_log4j.xml", "-inputDir",
+                path, "-outputDir", "testout-output"};
+
+
+        String[] commandLine = BatchCommandLineBuilder.build(params);
+        StringBuilder sb = new StringBuilder();
+
+        for (String s : commandLine) {
+            sb.append(s).append(" ");
+        }
+        String s = sb.toString();
+        int classInd = s.indexOf("org.apache.tika.batch.fs.FSBatchProcessCLI");
+        int xmx = s.indexOf("-Xmx1g");
+        int inputDir = s.indexOf("-inputDir");
+        int log = s.indexOf("-Dlog4j.configuration");
+        assertTrue(classInd > -1);
+        assertTrue(xmx > -1);
+        assertTrue(inputDir > -1);
+        assertTrue(log > -1);
+        assertTrue(xmx < classInd);
+        assertTrue(log < classInd);
+        assertTrue(inputDir > classInd);
+    }
+
+    @Test
+    public void testBasicMappingOfArgs() throws Exception {
+        String path = testInput.getAbsolutePath();
+        if (path.contains(" ")) {
+            path = "\"" + path + "\"";
+        }
+        String[] params = {"-JXmx1g", 
"-JDlog4j.configuration=batch_process_log4j.xml",
+                "-bc", "batch-config.xml",
+                "-J", "-h", "-inputDir", path};
+
+        String[] commandLine = BatchCommandLineBuilder.build(params);
+        Map<String, String> attrs = mapify(commandLine);
+        assertEquals("true", attrs.get("-recursiveParserWrapper"));
+        assertEquals("html", attrs.get("-basicHandlerType"));
+        assertEquals("json", attrs.get("-outputSuffix"));
+        assertEquals("batch-config.xml", attrs.get("-bc"));
+        assertEquals(path, attrs.get("-inputDir"));
+    }
+
+    @Test
+    public void testTwoDirsNoFlags() throws Exception {
+        String outputRoot = "outputRoot";
+        String path = testInput.getAbsolutePath();
+        if (path.contains(" ")) {
+            path = "\"" + path + "\"";
+        }
+        String[] params = {path, outputRoot};
+
+        String[] commandLine = BatchCommandLineBuilder.build(params);
+        Map<String, String> attrs = mapify(commandLine);
+        assertEquals(path, attrs.get("-inputDir"));
+        assertEquals(outputRoot, attrs.get("-outputDir"));
+    }
+
+    @Test
+    public void testTwoDirsVarious() throws Exception {
+        String outputRoot = "outputRoot";
+        String path = testInput.getAbsolutePath();
+        if (path.contains(" ")) {
+            path = "\"" + path + "\"";
+        }
+        String[] params = {"-i", path, "-o", outputRoot};
+
+        String[] commandLine = BatchCommandLineBuilder.build(params);
+        Map<String, String> attrs = mapify(commandLine);
+        assertEquals(path, attrs.get("-inputDir"));
+        assertEquals(outputRoot, attrs.get("-outputDir"));
+
+        params = new String[]{"--inputDir", path, "--outputDir", outputRoot};
+
+        commandLine = BatchCommandLineBuilder.build(params);
+        attrs = mapify(commandLine);
+        assertEquals(path, attrs.get("-inputDir"));
+        assertEquals(outputRoot, attrs.get("-outputDir"));
+
+        params = new String[]{"-inputDir", path, "-outputDir", outputRoot};
+
+        commandLine = BatchCommandLineBuilder.build(params);
+        attrs = mapify(commandLine);
+        assertEquals(path, attrs.get("-inputDir"));
+        assertEquals(outputRoot, attrs.get("-outputDir"));
+    }
+
+    @Test
+    public void testConfig() throws Exception {
+        String outputRoot = "outputRoot";
+        String configPath = "c:/somewhere/someConfig.xml";
+        String path = testInput.getAbsolutePath();
+
+        if (path.contains(" ")) {
+            path = "\"" + path + "\"";
+        }
+
+        String[] params = {"--inputDir", path, "--outputDir", outputRoot,
+                "--config="+configPath};
+        String[] commandLine = BatchCommandLineBuilder.build(params);
+        Map<String, String> attrs = mapify(commandLine);
+        assertEquals(path, attrs.get("-inputDir"));
+        assertEquals(outputRoot, attrs.get("-outputDir"));
+        assertEquals(configPath, attrs.get("-c"));
+
+    }
+
+    @Test
+    public void testOneDirOneFileException() throws Exception {
+        boolean ex = false;
+        try {
+            String outputRoot = "outputRoot";
+            String path = testInput.getAbsolutePath();
+            if (path.contains(" ")) {
+                path = "\"" + path + "\"";
+            }
+            String[] params = {path, testFile.getAbsolutePath()};
+
+            String[] commandLine = BatchCommandLineBuilder.build(params);
+
+        } catch (IllegalArgumentException e) {
+            ex = true;
+        }
+        assertTrue("exception on <dir> <file>", ex);
+    }
+
+    private Map<String, String> mapify(String[] args) {
+        Map<String, String> map = new LinkedHashMap<String, String>();
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].startsWith("-")) {
+                String k = args[i];
+                String v = "";
+                if (i < args.length - 1 && !args[i + 1].startsWith("-")) {
+                    v = args[i + 1];
+                    i++;
+                }
+                map.put(k, v);
+            }
+        }
+        return map;
+    }
+
+}

Added: tika/trunk/tika-batch/pom.xml
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/pom.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/pom.xml (added)
+++ tika/trunk/tika-batch/pom.xml Mon Mar 23 16:09:10 2015
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <cli.version>1.2</cli.version> <!--sync version with tika-server or 
move to parent? -->
+    </properties>
+
+    <parent>
+        <groupId>org.apache.tika</groupId>
+        <artifactId>tika-parent</artifactId>
+        <version>1.8-SNAPSHOT</version>
+        <relativePath>../tika-parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>tika-batch</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache Tika batch</name>
+    <url>http://tika.apache.org/</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-parsers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-serialization</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-xmp</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>apache-log4j-extras</artifactId>
+            <version>1.2.17</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>${cli.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tika</groupId>
+            <artifactId>tika-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tika</groupId>
+            <artifactId>tika-parsers</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+            <version>2.1</version>
+        </dependency>
+
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-remote-resources-plugin</artifactId>
+                <version>1.5</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>bundle</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <includes>
+                        <include>**/*.xml</include>
+                    </includes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Bundle-DocURL>${project.url}</Bundle-DocURL>
+                        <Bundle-Activator>
+                            org.apache.tika.config.TikaActivator
+                        </Bundle-Activator>
+                        <Bundle-ActivationPolicy>lazy</Bundle-ActivationPolicy>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        
<exclude>src/test/resources/org/apache/tika/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.10</version>
+                <configuration>
+                    <additionalClasspathElements>
+                        <additionalClasspathElement>
+                            
${project.build.directory}/${project.build.finalName}.jar
+                        </additionalClasspathElement>
+                    </additionalClasspathElements>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+
+    <organization>
+        <name>The Apache Software Foundation</name>
+        <url>http://www.apache.org</url>
+    </organization>
+    <scm>
+        <url>http://svn.apache.org/viewvc/tika/trunk/tika-batch</url>
+        
<connection>scm:svn:http://svn.apache.org/repos/asf/tika/trunk/tika-batch</connection>
+        
<developerConnection>scm:svn:https://svn.apache.org/repos/asf/tika/trunk/tika-batch</developerConnection>
+    </scm>
+    <issueManagement>
+        <system>JIRA</system>
+        <url>https://issues.apache.org/jira/browse/TIKA</url>
+    </issueManagement>
+    <ciManagement>
+        <system>Jenkins</system>
+        <url>https://builds.apache.org/job/Tika-trunk/</url>
+    </ciManagement>
+
+
+</project>
\ No newline at end of file

Added: tika/trunk/tika-batch/src/main/examples/batchExecutor.sh
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/examples/batchExecutor.sh?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/examples/batchExecutor.sh (added)
+++ tika/trunk/tika-batch/src/main/examples/batchExecutor.sh Mon Mar 23 
16:09:10 2015
@@ -0,0 +1 @@
+java -Dlog4j.configuration=file:bin/log4j_driver.xml -cp "bin/*" 
org.apache.tika.batch.BatchProcessDriverCLI java 
-Dlog4j.configuration=file:bin/log4j.xml -cp "bin/*" -Xmx6g 
org.apache.tika.batch.fs.FSBatchProcessCLI -c tika-batch-config-basic-test.xml 
-outputDir output -inputDir input

Added: tika/trunk/tika-batch/src/main/examples/log4j.xml
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/examples/log4j.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/examples/log4j.xml (added)
+++ tika/trunk/tika-batch/src/main/examples/log4j.xml Mon Mar 23 16:09:10 2015
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="true">
+  <appender name="stdout" class="org.apache.log4j.ConsoleAppender"> 
+    <layout class="org.apache.log4j.PatternLayout"> 
+      <!-- Pattern to output the caller's file name and line number -->
+      <!--<param name="ConversionPattern" value="%5p [%t] (%F:%L) - %m%n"/>--> 
+      <param name="ConversionPattern" value="%m%n"/> 
+    </layout> 
+  </appender> 
+
+    <appender name="default.file" class="org.apache.log4j.FileAppender">
+        <param name="file" value="logs/tika-batch.log" />
+        <param name="append" value="true" />
+        <param name="threshold" value="info" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%-4r %d [%t] %-5p %c %x - 
%m%n" />
+        </layout>
+    </appender>
+
+    <appender name="pdfbox.file" class="org.apache.log4j.FileAppender">
+        <param name="file" value="logs/tika-batch-pdfbox.log" />
+        <param name="append" value="true" />
+        <param name="threshold" value="warn" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%-4r %d [%t] %-5p %c %x - 
%m%n" />
+        </layout>
+    </appender>
+
+  <logger name="org.apache.tika.batch.StatusReporter" additivity="false">
+    <level value="info"/>
+    <appender-ref ref="stdout"/>
+  </logger>
+
+  <logger name="org.apache.pdfbox" additivity="false">
+       <level value="warn"/>
+       <appender-ref ref="pdfbox.file"/>
+  </logger>
+
+
+  <logger name="org.apache.fontbox" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="pdfbox.file"/>
+  </logger>
+
+
+    <root>
+        <priority value="info" />
+        <appender-ref ref="default.file" />
+    </root>
+
+
+</log4j:configuration>
+

Added: tika/trunk/tika-batch/src/main/examples/log4j_driver.xml
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/examples/log4j_driver.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/examples/log4j_driver.xml (added)
+++ tika/trunk/tika-batch/src/main/examples/log4j_driver.xml Mon Mar 23 
16:09:10 2015
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="true">
+  <appender name="stdout" class="org.apache.log4j.ConsoleAppender"> 
+    <layout class="org.apache.log4j.PatternLayout"> 
+      <!-- Pattern to output the caller's file name and line number -->
+      <!--<param name="ConversionPattern" value="%5p [%t] (%F:%L) - %m%n"/>--> 
+      <param name="ConversionPattern" value="%m%n"/> 
+    </layout> 
+  </appender> 
+
+    <appender name="default.file" class="org.apache.log4j.FileAppender">
+        <param name="file" value="logs/tika-batch-driver.log" />
+        <param name="append" value="true" />
+        <param name="threshold" value="trace" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%-4r %d [%t] %-5p %c %x - 
%m%n" />
+        </layout>
+    </appender>
+
+<!--
+  <logger name="org.apache.tika.batch.FSBatchProcessDriverCLI" 
additivity="true">
+    <level value="trace"/>
+    <appender-ref ref="stdout"/>
+  </logger>
+-->
+<!--  <logger name="org.apache.tika.batch.FSBatchProcessDriverCLI" 
additivity="false">
+         <level value="warn"/>
+       <appender-ref ref="default.file"/>
+  </logger>
+-->    <root>
+        <priority value="trace" />
+        <appender-ref ref="stdout" />
+    </root>
+
+</log4j:configuration>
+

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/AutoDetectParserFactory.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,34 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.Parser;
+
+/**
+ * Simple class for AutoDetectParser
+ */
+public class AutoDetectParserFactory implements ParserFactory {
+
+  @Override
+  public Parser getParser(TikaConfig config) {
+    return new AutoDetectParser(config);
+  }
+  
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchNoRestartError.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,33 @@
+package org.apache.tika.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * FileResourceConsumers should throw this if something
+ * catastrophic has happened and the BatchProcess should shutdown
+ * and not be restarted.
+ *
+ */
+public class BatchNoRestartError extends Error {
+
+    public BatchNoRestartError(Throwable t) {
+        super(t);
+    }
+    public BatchNoRestartError(String message) {
+        super(message);
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java 
(added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java 
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,590 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tika.io.IOUtils;
+
+
+/**
+ * This is the main processor class for a single process.
+ * This class can only be run once.
+ * <p/>
+ * It requires a {@link FileResourceCrawler} and {@link 
FileResourceConsumer}s, and it can also
+ * support a {@link StatusReporter} and an {@link Interrupter}.
+ * <p/>
+ * This is designed to shutdown if a parser has timed out or if there is
+ * an OutOfMemoryError. Consider using {@link BatchProcessDriverCLI}
+ * as a daemon/watchdog that monitors and can restart this batch process;
+ * <p>
+ * Note that this classs redirects stderr to stdout so that it can
+ * communicate without interference with the parent process on stderr.
+ */
+public class BatchProcess implements Callable<ParallelFileProcessingResult> {
+
+    public enum BATCH_CONSTANTS {
+        BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME,
+        BATCH_PROCESS_FATAL_MUST_RESTART
+    }
+
+    private enum CAUSE_FOR_TERMINATION {
+        COMPLETED_NORMALLY,
+        MAIN_LOOP_EXCEPTION_NO_RESTART,
+        CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART,
+        MAIN_LOOP_EXCEPTION,
+        CRAWLER_TIMED_OUT,
+        TIMED_OUT_CONSUMER,
+        USER_INTERRUPTION,
+        BATCH_PROCESS_ALIVE_TOO_LONG,
+    }
+
+    private static final Log logger;
+    static {
+        logger = LogFactory.getLog(BatchProcess.class);
+    }
+
+    private PrintStream outputStreamWriter;
+
+    // If a file hasn't been processed in this amount of time,
+    // report it to the console. When the directory crawler has stopped, the 
thread will
+    // be terminated and the file name will be logged
+    private long timeoutThresholdMillis = 5 * 60 * 1000; // 5 minutes
+
+    private long timeoutCheckPulseMillis = 2 * 60 * 1000; //2 minutes
+    //if there was an early termination via the Interrupter
+    //or because of an uncaught runtime throwable, pause
+    //this long before shutting down to allow parsers to finish
+    private long pauseOnEarlyTerminationMillis = 30*1000; //30 seconds
+
+    private final long consumersManagerMaxMillis;
+
+    //maximum time that this process should stay alive
+    //to avoid potential memory leaks, not a bad idea to shutdown
+    //every hour or so.
+    private int maxAliveTimeSeconds = -1;
+
+    private final FileResourceCrawler fileResourceCrawler;
+
+    private final ConsumersManager consumersManager;
+
+    private final StatusReporter reporter;
+
+    private final Interrupter interrupter;
+
+    private final ArrayBlockingQueue<FileStarted> timedOuts;
+
+    private boolean alreadyExecuted = false;
+
+    public BatchProcess(FileResourceCrawler fileResourceCrawler,
+                        ConsumersManager consumersManager,
+                        StatusReporter reporter,
+                        Interrupter interrupter) {
+        this.fileResourceCrawler = fileResourceCrawler;
+        this.consumersManager = consumersManager;
+        this.reporter = reporter;
+        this.interrupter = interrupter;
+        timedOuts = new 
ArrayBlockingQueue<FileStarted>(consumersManager.getConsumers().size());
+        this.consumersManagerMaxMillis = 
consumersManager.getConsumersManagerMaxMillis();
+    }
+
+    /**
+     * Runs main execution loop.
+     * <p>
+     * Redirects stdout to stderr to keep clean communications
+     * over stdout with parent process
+     * @return result of the processing
+     * @throws InterruptedException
+     */
+    public ParallelFileProcessingResult call()
+            throws InterruptedException {
+        if (alreadyExecuted) {
+            throw new IllegalStateException("Can only execute BatchRunner 
once.");
+        }
+        //redirect streams
+        try {
+            outputStreamWriter = new PrintStream(System.err, true, 
IOUtils.UTF_8.toString());
+        } catch (IOException e) {
+            throw new RuntimeException("Can't redirect streams");
+        }
+        System.setErr(System.out);
+
+        ParallelFileProcessingResult result = null;
+        try {
+            int numConsumers = consumersManager.getConsumers().size();
+            // fileResourceCrawler, statusReporter, the Interrupter, 
timeoutChecker
+            int numNonConsumers = 4;
+
+            ExecutorService ex = Executors.newFixedThreadPool(numConsumers
+                    + numNonConsumers);
+            CompletionService<IFileProcessorFutureResult> completionService =
+                    new ExecutorCompletionService<IFileProcessorFutureResult>(
+                            ex);
+            TimeoutChecker timeoutChecker = new TimeoutChecker();
+
+            try {
+                startConsumersManager();
+            } catch (BatchNoRestartError e) {
+                return new
+                        ParallelFileProcessingResult(0, 0, 0,
+                        0, BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE,
+                        
CAUSE_FOR_TERMINATION.CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART.toString());
+
+            }
+
+            State state = mainLoop(completionService, timeoutChecker);
+            result = shutdown(ex, completionService, timeoutChecker, state);
+        } finally {
+            shutdownConsumersManager();
+        }
+        return result;
+    }
+
+
+    private State mainLoop(CompletionService<IFileProcessorFutureResult> 
completionService,
+                           TimeoutChecker timeoutChecker) {
+        alreadyExecuted = true;
+        State state = new State();
+        logger.info("BatchProcess starting up");
+
+
+        state.start = new Date().getTime();
+        completionService.submit(interrupter);
+        completionService.submit(fileResourceCrawler);
+        completionService.submit(reporter);
+        completionService.submit(timeoutChecker);
+
+
+        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+            completionService.submit(consumer);
+        }
+
+        state.numConsumers = consumersManager.getConsumers().size();
+        CAUSE_FOR_TERMINATION causeForTermination = null;
+        //main processing loop
+        while (true) {
+            try {
+                Future<IFileProcessorFutureResult> futureResult =
+                        completionService.poll(1, TimeUnit.SECONDS);
+
+                if (futureResult != null) {
+                    state.removed++;
+                    IFileProcessorFutureResult result = futureResult.get();
+                    if (result instanceof FileConsumerFutureResult) {
+                        state.consumersRemoved++;
+                        state.processed += ((FileConsumerFutureResult) 
result).getFilesProcessed();
+                    } else if (result instanceof 
FileResourceCrawlerFutureResult) {
+                        state.crawlersRemoved++;
+                        if (fileResourceCrawler.wasTimedOut()) {
+                            causeForTermination = 
CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT;
+                            break;
+                        }
+                    } else if (result instanceof InterrupterFutureResult) {
+                        causeForTermination = 
CAUSE_FOR_TERMINATION.USER_INTERRUPTION;
+                        break;
+                    } else if (result instanceof TimeoutFutureResult) {
+                        causeForTermination = 
CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER;
+                        break;
+                    } //only thing left should be StatusReporterResult
+                }
+
+                if (state.consumersRemoved >= state.numConsumers) {
+                    causeForTermination = 
CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY;
+                    break;
+                }
+                if (aliveTooLong(state.start)) {
+                    causeForTermination = 
CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG;
+                    break;
+                }
+            } catch (Throwable e) {
+                if (isNonRestart(e)) {
+                    causeForTermination = 
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART;
+                } else {
+                    causeForTermination = 
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION;
+                }
+                logger.fatal("Main loop execution exception: " + 
e.getMessage());
+                break;
+            }
+        }
+        state.causeForTermination = causeForTermination;
+        return state;
+    }
+
+    private ParallelFileProcessingResult shutdown(ExecutorService ex,
+        CompletionService<IFileProcessorFutureResult> completionService,
+        TimeoutChecker timeoutChecker, State state) {
+
+        reporter.setIsShuttingDown(true);
+        int added = fileResourceCrawler.getAdded();
+        int considered = fileResourceCrawler.getConsidered();
+
+        //TODO: figure out safe way to shutdown resource crawler
+        //if it isn't.  Does it need to add poison at this point?
+        //fileResourceCrawler.pleaseShutdown();
+
+        //Step 1: prevent uncalled threads from being started
+        ex.shutdown();
+
+        //Step 2: ask consumers to shutdown politely.
+        //Under normal circumstances, they should all have completed by now.
+        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+            consumer.pleaseShutdown();
+        }
+        //The resourceCrawler should shutdown now.  No need for poison.
+        fileResourceCrawler.shutDownNoPoison();
+        //if there are any active/asked to shutdown consumers, await 
termination
+        //this can happen if a user interrupts the process
+        //of if the crawler stops early, or ...
+        politelyAwaitTermination(state.causeForTermination);
+
+        //Step 3: Gloves come off.  We've tried to ask kindly before.
+        //Now it is time shut down. This will corrupt
+        //nio channels via thread interrupts!  Hopefully, everything
+        //has shut down by now.
+        logger.trace("About to shutdownNow()");
+        List<Runnable> neverCalled = ex.shutdownNow();
+        logger.trace("TERMINATED " + ex.isTerminated() + " : "
+                + state.consumersRemoved + " : " + state.crawlersRemoved);
+
+        int end = state.numConsumers + state.numNonConsumers - state.removed - 
neverCalled.size();
+
+        for (int t = 0; t < end; t++) {
+            Future<IFileProcessorFutureResult> future = null;
+            try {
+                future = completionService.poll(10, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                logger.warn("thread interrupt while polling in final shutdown 
loop");
+                break;
+            }
+            logger.trace("In while future==null loop in final shutdown loop");
+            if (future == null) {
+                break;
+            }
+            try {
+                IFileProcessorFutureResult result = future.get();
+                if (result instanceof FileConsumerFutureResult) {
+                    FileConsumerFutureResult consumerResult = 
(FileConsumerFutureResult) result;
+                    state.processed += consumerResult.getFilesProcessed();
+                    FileStarted fileStarted = consumerResult.getFileStarted();
+                    if (fileStarted != null
+                            && fileStarted.getElapsedMillis() > 
timeoutThresholdMillis) {
+                        logger.warn(fileStarted.getResourceId()
+                                + "\t caused a file processor to hang or 
crash. You may need to remove "
+                                + "this file from your input set and rerun.");
+                    }
+                } else if (result instanceof FileResourceCrawlerFutureResult) {
+                    FileResourceCrawlerFutureResult crawlerResult = 
(FileResourceCrawlerFutureResult) result;
+                    considered += crawlerResult.getConsidered();
+                    added += crawlerResult.getAdded();
+                } //else ...we don't care about anything else stopping at this 
point
+            } catch (ExecutionException e) {
+                logger.error("Execution exception trying to shutdown after 
shutdownNow:" + e.getMessage());
+            } catch (InterruptedException e) {
+                logger.error("Interrupted exception trying to shutdown after 
shutdownNow:" + e.getMessage());
+            }
+        }
+        //do we need to restart?
+        String restartMsg = null;
+        if (state.causeForTermination == 
CAUSE_FOR_TERMINATION.USER_INTERRUPTION
+                || state.causeForTermination == 
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
+            //do not restart!!!
+        } else if (state.causeForTermination == 
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION) {
+            restartMsg = "Uncaught consumer throwable";
+        } else if (state.causeForTermination == 
CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER) {
+            if (areResourcesPotentiallyRemaining()) {
+                restartMsg = "Consumer timed out with resources remaining";
+            }
+        } else if (state.causeForTermination == 
CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG) {
+            restartMsg = 
BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString();
+        } else if (state.causeForTermination == 
CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT) {
+            restartMsg = "Crawler timed out.";
+        } else if (fileResourceCrawler.wasTimedOut()) {
+            restartMsg = "Crawler was timed out.";
+        } else if (fileResourceCrawler.isActive()) {
+            restartMsg = "Crawler is still active.";
+        } else if (! fileResourceCrawler.isQueueEmpty()) {
+            restartMsg = "Resources still exist for processing";
+        }
+
+        int exitStatus = getExitStatus(state.causeForTermination, restartMsg);
+
+        //need to re-check, report, mark timed out consumers
+        timeoutChecker.checkForTimedOutConsumers();
+
+        for (FileStarted fs : timedOuts) {
+            logger.fatal("A parser was still working on >" + 
fs.getResourceId() +
+                    "< for " + fs.getElapsedMillis() + " milliseconds after it 
started." +
+                    " This exceeds the maxTimeoutMillis parameter");
+        }
+        double elapsed = ((double) new Date().getTime() - (double) 
state.start) / 1000.0;
+        return new
+            ParallelFileProcessingResult(considered, added, state.processed,
+                elapsed, exitStatus, state.causeForTermination.toString());
+    }
+
+    private class State {
+        long start = -1;
+        int processed = 0;
+        int numConsumers = 0;
+        int numNonConsumers = 0;
+        int removed = 0;
+        int consumersRemoved = 0;
+        int crawlersRemoved = 0;
+        CAUSE_FOR_TERMINATION causeForTermination = null;
+    }
+
+    private void startConsumersManager() {
+        if (consumersManagerMaxMillis < 0) {
+            consumersManager.init();
+            return;
+        }
+        Thread timed = new Thread() {
+            public void run() {
+                logger.trace("about to start consumers manager");
+                consumersManager.init();
+                logger.trace("finished starting consumers manager");
+            }
+        };
+        //don't allow this thread to keep process alive
+        timed.setDaemon(true);
+        timed.start();
+        try {
+            timed.join(consumersManagerMaxMillis);
+        } catch (InterruptedException e) {
+            logger.warn("interruption exception during consumers manager 
shutdown");
+        }
+        if (timed.isAlive()) {
+            logger.fatal("ConsumersManager did not start within " + 
consumersManagerMaxMillis + "ms");
+            throw new BatchNoRestartError("ConsumersManager did not start 
within "+consumersManagerMaxMillis+"ms");
+        }
+    }
+
+    private void shutdownConsumersManager() {
+        if (consumersManagerMaxMillis < 0) {
+            consumersManager.shutdown();
+            return;
+        }
+        Thread timed = new Thread() {
+            public void run() {
+                logger.trace("starting to shutdown consumers manager");
+                consumersManager.shutdown();
+                logger.trace("finished shutting down consumers manager");
+            }
+        };
+        timed.setDaemon(true);
+        timed.start();
+        try {
+            timed.join(consumersManagerMaxMillis);
+        } catch (InterruptedException e) {
+            logger.warn("interruption exception during consumers manager 
shutdown");
+        }
+        if (timed.isAlive()) {
+            logger.error("ConsumersManager was still alive during shutdown!");
+            throw new BatchNoRestartError("ConsumersManager did not shutdown 
within: "+
+                    consumersManagerMaxMillis+"ms");
+        }
+    }
+
+    /**
+     * This is used instead of awaitTermination(), because that interrupts
+     * the thread and then waits for its termination.  This politely waits.
+     *
+     * @param causeForTermination reason for termination.
+     */
+    private void politelyAwaitTermination(CAUSE_FOR_TERMINATION 
causeForTermination) {
+        if (causeForTermination == CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY) {
+            return;
+        }
+        long start = new Date().getTime();
+        while (countActiveConsumers() > 0) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                logger.warn("Thread interrupted while trying to 
politelyAwaitTermination");
+                return;
+            }
+            long elapsed = new Date().getTime()-start;
+            if (pauseOnEarlyTerminationMillis > -1 &&
+                    elapsed > pauseOnEarlyTerminationMillis) {
+                logger.warn("I waited after an early termination for "+
+                elapsed + ", but there was at least one active consumer");
+                return;
+            }
+        }
+    }
+
+    private boolean isNonRestart(Throwable e) {
+        if (e instanceof BatchNoRestartError) {
+            return true;
+        }
+        Throwable cause = e.getCause();
+        return cause != null && isNonRestart(cause);
+    }
+
+    private int getExitStatus(CAUSE_FOR_TERMINATION causeForTermination, 
String restartMsg) {
+        if (causeForTermination == 
CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
+            logger.info(CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART);
+            return BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE;
+        }
+
+        if (restartMsg != null) {
+            if 
(restartMsg.equals(BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString()))
 {
+                logger.warn(restartMsg);
+            } else {
+                logger.fatal(restartMsg);
+            }
+
+            //send over stdout wrapped in outputStreamWriter
+            outputStreamWriter.println(
+                    
BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString() +
+                            " >> " + restartMsg);
+            outputStreamWriter.flush();
+            return BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE;
+        }
+        return 0;
+    }
+
+    //could new FileResources be consumed from the Queue?
+    //Because of race conditions, this can return a true
+    //when the real answer is false.
+    //This should never return false, though, if the answer is true!
+    private boolean areResourcesPotentiallyRemaining() {
+        if (fileResourceCrawler.isActive()) {
+            return true;
+        }
+        return !fileResourceCrawler.isQueueEmpty();
+    }
+
+    private boolean aliveTooLong(long started) {
+        if (maxAliveTimeSeconds < 0) {
+            return false;
+        }
+        double elapsedSeconds = (double) (new Date().getTime() - started) / 
(double) 1000;
+        return elapsedSeconds > (double) maxAliveTimeSeconds;
+    }
+
+    //snapshot of non-retired consumers; actual number may be smaller by the 
time
+    //this returns a value!
+    private int countActiveConsumers() {
+        int active = 0;
+        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+            if (consumer.isStillActive()) {
+                active++;
+            }
+        }
+        return active;
+    }
+
+    /**
+     * If there is an early termination via an interrupt or too many timed out 
consumers
+     * or because a consumer or other Runnable threw a Throwable, pause this 
long
+     * before killing the consumers and other threads.
+     *
+     * Typically makes sense for this to be the same or slightly larger than
+     * timeoutThresholdMillis
+     *
+     * @param pauseOnEarlyTerminationMillis how long to pause if there is an 
early termination
+     */
+    public void setPauseOnEarlyTerminationMillis(long 
pauseOnEarlyTerminationMillis) {
+        this.pauseOnEarlyTerminationMillis = pauseOnEarlyTerminationMillis;
+    }
+
+    /**
+     * The amount of time allowed before a consumer should be timed out.
+     *
+     * @param timeoutThresholdMillis threshold in milliseconds before 
declaring a consumer timed out
+     */
+    public void setTimeoutThresholdMillis(long timeoutThresholdMillis) {
+        this.timeoutThresholdMillis = timeoutThresholdMillis;
+    }
+
+    public void setTimeoutCheckPulseMillis(long timeoutCheckPulseMillis) {
+        this.timeoutCheckPulseMillis = timeoutCheckPulseMillis;
+    }
+
+    /**
+     * The maximum amount of time that this process can be alive.  To avoid
+     * memory leaks, it is sometimes beneficial to shutdown (and restart) the
+     * process periodically.
+     * <p/>
+     * If the value is < 0, the process will run until completion, 
interruption or exception.
+     *
+     * @param maxAliveTimeSeconds maximum amount of time in seconds to remain 
alive
+     */
+    public void setMaxAliveTimeSeconds(int maxAliveTimeSeconds) {
+        this.maxAliveTimeSeconds = maxAliveTimeSeconds;
+    }
+
+    private class TimeoutChecker implements 
Callable<IFileProcessorFutureResult> {
+
+        @Override
+        public TimeoutFutureResult call() throws Exception {
+            while (timedOuts.size() == 0) {
+                try {
+                    Thread.sleep(timeoutCheckPulseMillis);
+                } catch (InterruptedException e) {
+                    logger.debug("Thread interrupted exception in 
TimeoutChecker");
+                    break;
+                    //just stop.
+                }
+                checkForTimedOutConsumers();
+                if (countActiveConsumers() == 0) {
+                    logger.info("No activeConsumers in TimeoutChecker");
+                    break;
+                }
+            }
+            logger.debug("TimeoutChecker quitting: " + timedOuts.size());
+            return new TimeoutFutureResult(timedOuts.size());
+        }
+
+        private void checkForTimedOutConsumers() {
+            for (FileResourceConsumer consumer : 
consumersManager.getConsumers()) {
+                FileStarted fs = 
consumer.checkForTimedOutMillis(timeoutThresholdMillis);
+                if (fs != null) {
+                    timedOuts.add(fs);
+                }
+            }
+        }
+    }
+
+    private class TimeoutFutureResult implements IFileProcessorFutureResult {
+        private final int timedOutCount;
+
+        private TimeoutFutureResult(final int timedOutCount) {
+            this.timedOutCount = timedOutCount;
+        }
+
+        protected int getTimedOutCount() {
+            return timedOutCount;
+        }
+    }
+}


Reply via email to