Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,295 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.tika.batch.BatchProcess;
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceCrawler;
+import org.apache.tika.batch.Interrupter;
+import org.apache.tika.batch.StatusReporter;
+import org.apache.tika.util.ClassLoaderUtil;
+import org.apache.tika.util.XMLDOMUtil;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+
+/**
+ * Builds a BatchProcessor from a combination of runtime arguments and the
+ * config file.
+ */
+public class BatchProcessBuilder {
+
+    public final static int DEFAULT_MAX_QUEUE_SIZE = 1000;
+    public final static String MAX_QUEUE_SIZE_KEY = "maxQueueSize";
+    public final static String NUM_CONSUMERS_KEY = "numConsumers";
+
+    /**
+     * Builds a BatchProcess from runtime arguments and a
+     * input stream of a configuration file.  With the exception of the 
QueueBuilder,
+     * the builders choose how to adjudicate between
+     * runtime arguments and the elements in the configuration file.
+     * <p/>
+     * This does not close the InputStream!
+     * @param is inputStream
+     * @param runtimeAttributes incoming runtime attributes
+     * @return batch process
+     * @throws java.io.IOException
+     */
+    public BatchProcess build(InputStream is, Map<String,String> 
runtimeAttributes) throws IOException {
+        Document doc = null;
+        DocumentBuilderFactory fact = DocumentBuilderFactory.newInstance();
+        DocumentBuilder docBuilder = null;
+        try {
+            docBuilder = fact.newDocumentBuilder();
+            doc = docBuilder.parse(is);
+        } catch (ParserConfigurationException e) {
+            throw new IOException(e);
+        } catch (SAXException e) {
+            throw new IOException(e);
+        }
+        Node docElement = doc.getDocumentElement();
+        return build(docElement, runtimeAttributes);
+    }
+
+    /**
+     * Builds a FileResourceBatchProcessor from runtime arguments and a
+     * document node of a configuration file.  With the exception of the 
QueueBuilder,
+     * the builders choose how to adjudicate between
+     * runtime arguments and the elements in the configuration file.
+     *
+     * @param docElement   document element of the xml config file
+     * @param incomingRuntimeAttributes runtime arguments
+     * @return FileResourceBatchProcessor
+     */
+    public BatchProcess build(Node docElement, Map<String, String> 
incomingRuntimeAttributes) {
+
+        //key components
+        long timeoutThresholdMillis = 
XMLDOMUtil.getLong("timeoutThresholdMillis",
+                incomingRuntimeAttributes, docElement);
+        long timeoutCheckPulseMillis = 
XMLDOMUtil.getLong("timeoutCheckPulseMillis",
+                incomingRuntimeAttributes, docElement);
+        long pauseOnEarlyTerminationMillis = 
XMLDOMUtil.getLong("pauseOnEarlyTerminationMillis",
+                incomingRuntimeAttributes, docElement);
+        int maxAliveTimeSeconds = XMLDOMUtil.getInt("maxAliveTimeSeconds",
+                incomingRuntimeAttributes, docElement);
+
+        FileResourceCrawler crawler = null;
+        ConsumersManager consumersManager = null;
+        StatusReporter reporter = null;
+        Interrupter interrupter = null;
+
+        /*
+         * TODO: This is a bit smelly.  NumConsumers needs to be used by the 
crawler
+         * and the consumers.  This copies the incomingRuntimeAttributes and 
then
+         * supplies the numConsumers from the commandline (if it exists) or 
from the config file
+         * At least this creates an unmodifiable defensive copy of 
incomingRuntimeAttributes...
+         */
+        Map<String, String> runtimeAttributes = 
setNumConsumersInRuntimeAttributes(docElement, incomingRuntimeAttributes);
+
+        //build queue
+        ArrayBlockingQueue<FileResource> queue = buildQueue(docElement, 
runtimeAttributes);
+
+        NodeList children = docElement.getChildNodes();
+        Map<String, Node> keyNodes = new HashMap<String, Node>();
+        for (int i = 0; i < children.getLength(); i++) {
+            Node child = children.item(i);
+            if (child.getNodeType() != Node.ELEMENT_NODE) {
+                continue;
+            }
+            String nodeName = child.getNodeName();
+            keyNodes.put(nodeName, child);
+        }
+        //build consumers
+        consumersManager = buildConsumersManager(keyNodes.get("consumers"), 
runtimeAttributes, queue);
+
+        //build crawler
+        crawler = buildCrawler(queue, keyNodes.get("crawler"), 
runtimeAttributes);
+
+        reporter = buildReporter(crawler, consumersManager, 
keyNodes.get("reporter"), runtimeAttributes);
+
+        interrupter = buildInterrupter(keyNodes.get("interrupter"), 
runtimeAttributes);
+
+        BatchProcess proc = new BatchProcess(
+                crawler, consumersManager, reporter, interrupter);
+
+        if (timeoutThresholdMillis > -1) {
+            proc.setTimeoutThresholdMillis(timeoutThresholdMillis);
+        }
+
+        if (pauseOnEarlyTerminationMillis > -1) {
+            
proc.setPauseOnEarlyTerminationMillis(pauseOnEarlyTerminationMillis);
+        }
+
+        if (timeoutCheckPulseMillis > -1) {
+            proc.setTimeoutCheckPulseMillis(timeoutCheckPulseMillis);
+        }
+        proc.setMaxAliveTimeSeconds(maxAliveTimeSeconds);
+        return proc;
+    }
+
+    private Interrupter buildInterrupter(Node node, Map<String, String> 
runtimeAttributes) {
+        Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, 
runtimeAttributes);
+        String className = attrs.get("builderClass");
+        if (className == null) {
+            throw new RuntimeException("Need to specify class name in 
interrupter element");
+        }
+        InterrupterBuilder builder = 
ClassLoaderUtil.buildClass(InterrupterBuilder.class, className);
+
+        return builder.build(node, runtimeAttributes);
+
+    }
+
+    private StatusReporter buildReporter(FileResourceCrawler crawler, 
ConsumersManager consumersManager,
+                                          Node node, Map<String, String> 
runtimeAttributes) {
+
+        Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, 
runtimeAttributes);
+        String className = attrs.get("builderClass");
+        if (className == null) {
+            throw new RuntimeException("Need to specify class name in reporter 
element");
+        }
+        StatusReporterBuilder builder = 
ClassLoaderUtil.buildClass(StatusReporterBuilder.class, className);
+
+        return builder.build(crawler, consumersManager, node, 
runtimeAttributes);
+
+    }
+
+    /**
+     * numConsumers is needed by both the crawler and the consumers. This 
utility method
+     * is to be used to extract the number of consumers from a map of String 
key value pairs.
+     * <p>
+     * If the value is "default", not a parseable integer or has a value < 1,
+     * then <code>AbstractConsumersBuilder</code>'s 
<code>getDefaultNumConsumers()</code>
+     * @param attrs attributes from which to select the NUM_CONSUMERS_KEY
+     * @return number of consumers
+     */
+    public static int getNumConsumers(Map<String, String> attrs) {
+        String nString = attrs.get(BatchProcessBuilder.NUM_CONSUMERS_KEY);
+        if (nString == null || nString.equals("default")) {
+            return AbstractConsumersBuilder.getDefaultNumConsumers();
+        }
+        int n = -1;
+        try {
+            n = Integer.parseInt(nString);
+        } catch (NumberFormatException e) {
+            //swallow
+        }
+        if (n < 1) {
+            n = AbstractConsumersBuilder.getDefaultNumConsumers();
+        }
+        return n;
+    }
+
+    private Map<String, String> setNumConsumersInRuntimeAttributes(Node 
docElement, Map<String, String> incomingRuntimeAttributes) {
+        Map<String, String> runtimeAttributes = new HashMap<String, String>();
+
+        for(Map.Entry<String, String> e : 
incomingRuntimeAttributes.entrySet()) {
+            runtimeAttributes.put(e.getKey(), e.getValue());
+        }
+
+        //if this is set at runtime use that value
+        if (runtimeAttributes.containsKey(NUM_CONSUMERS_KEY)){
+            return Collections.unmodifiableMap(runtimeAttributes);
+        }
+        Node ncNode = docElement.getAttributes().getNamedItem("numConsumers");
+        int numConsumers = -1;
+        String numConsumersString = ncNode.getNodeValue();
+        try {
+            numConsumers = Integer.parseInt(numConsumersString);
+        } catch (NumberFormatException e) {
+            //swallow and just use numConsumers
+        }
+        //TODO: should we have a max range check?
+        if (numConsumers < 1) {
+            numConsumers = AbstractConsumersBuilder.getDefaultNumConsumers();
+        }
+        runtimeAttributes.put(NUM_CONSUMERS_KEY, 
Integer.toString(numConsumers));
+        return Collections.unmodifiableMap(runtimeAttributes);
+    }
+
+    //tries to get maxQueueSize from main element
+    private ArrayBlockingQueue<FileResource> buildQueue(Node docElement,
+                                                        Map<String, String> 
runtimeAttributes) {
+        int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
+        String szString = runtimeAttributes.get(MAX_QUEUE_SIZE_KEY);
+
+        if (szString == null) {
+            Node szNode = 
docElement.getAttributes().getNamedItem(MAX_QUEUE_SIZE_KEY);
+            if (szNode != null) {
+                szString = szNode.getNodeValue();
+            }
+        }
+
+        if (szString != null) {
+            try {
+                maxQueueSize = Integer.parseInt(szString);
+            } catch (NumberFormatException e) {
+                //swallow
+            }
+        }
+
+        if (maxQueueSize < 0) {
+            maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
+        }
+
+        return new ArrayBlockingQueue<FileResource>(maxQueueSize);
+    }
+
+    private ConsumersManager buildConsumersManager(Node node,
+                Map<String, String> runtimeAttributes, 
ArrayBlockingQueue<FileResource> queue) {
+
+        Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, 
runtimeAttributes);
+        String className = attrs.get("builderClass");
+        if (className == null) {
+            throw new RuntimeException("Need to specify class name in 
consumers element");
+        }
+        AbstractConsumersBuilder builder = 
ClassLoaderUtil.buildClass(AbstractConsumersBuilder.class, className);
+
+        return builder.build(node, runtimeAttributes, queue);
+    }
+
+
+    private FileResourceCrawler buildCrawler(ArrayBlockingQueue<FileResource> 
queue,
+                                             Node node, Map<String, String> 
runtimeAttributes) {
+        Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, 
runtimeAttributes);
+        String className = attrs.get("builderClass");
+        if (className == null) {
+            throw new RuntimeException("Need to specify class name in crawler 
element");
+        }
+
+        ICrawlerBuilder builder = 
ClassLoaderUtil.buildClass(ICrawlerBuilder.class, className);
+        return builder.build(node, runtimeAttributes, queue);
+    }
+
+
+
+
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/CommandLineParserBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/CommandLineParserBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/CommandLineParserBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/CommandLineParserBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,143 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Locale;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.w3c.dom.Document;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+/**
+ * Reads configurable options from a config file and returns 
org.apache.commons.cli.Options
+ * object to be used in commandline parser.  This allows users and developers 
to set
+ * which options should be made available via the commandline.
+ */
+public class CommandLineParserBuilder {
+
+    public Options build(InputStream is) throws IOException {
+        Document doc = null;
+        DocumentBuilderFactory fact = DocumentBuilderFactory.newInstance();
+        DocumentBuilder docBuilder = null;
+        try {
+            docBuilder = fact.newDocumentBuilder();
+            doc = docBuilder.parse(is);
+        } catch (ParserConfigurationException e) {
+            throw new IOException(e);
+        } catch (SAXException e) {
+            throw new IOException(e);
+        }
+        Node docElement = doc.getDocumentElement();
+        NodeList children = docElement.getChildNodes();
+        Node commandlineNode = null;
+        for (int i = 0; i < children.getLength(); i++) {
+            Node child = children.item(i);
+            if (child.getNodeType() != Node.ELEMENT_NODE) {
+                continue;
+            }
+            String nodeName = child.getNodeName();
+            if (nodeName.equals("commandline")) {
+                commandlineNode = child;
+                break;
+            }
+        }
+        Options options = new Options();
+        if (commandlineNode == null) {
+            return options;
+        }
+        NodeList optionNodes = commandlineNode.getChildNodes();
+        for (int i = 0; i < optionNodes.getLength(); i++) {
+
+            Node optionNode = optionNodes.item(i);
+            if (optionNode.getNodeType() != Node.ELEMENT_NODE) {
+                continue;
+            }
+            Option opt = buildOption(optionNode);
+            if (opt != null) {
+                options.addOption(opt);
+            }
+        }
+        return options;
+    }
+
+    private Option buildOption(Node optionNode) {
+        NamedNodeMap map = optionNode.getAttributes();
+        String opt = getString(map, "opt", "");
+        String description = getString(map, "description", "");
+        String longOpt = getString(map, "longOpt", "");
+        boolean isRequired = getBoolean(map, "required", false);
+        boolean hasArg = getBoolean(map, "hasArg", false);
+        if(opt.trim().length() == 0 || description.trim().length() == 0) {
+            throw new IllegalArgumentException(
+                    "Must specify at least option and description");
+        }
+        Option option = new Option(opt, description);
+        if (longOpt.trim().length() > 0) {
+            option.setLongOpt(longOpt);
+        }
+        if (isRequired) {
+            option.setRequired(true);
+        }
+        if (hasArg) {
+            option.setArgs(1);
+        }
+        return option;
+    }
+
+    private boolean getBoolean(NamedNodeMap map, String opt, boolean 
defaultValue) {
+        Node n = map.getNamedItem(opt);
+        if (n == null) {
+            return defaultValue;
+        }
+
+        if (n.getNodeValue() == null) {
+            return defaultValue;
+        }
+
+        if (n.getNodeValue().toLowerCase(Locale.ROOT).equals("true")) {
+            return true;
+        } else if (n.getNodeValue().toLowerCase(Locale.ROOT).equals("false")) {
+            return false;
+        }
+        return defaultValue;
+    }
+
+    private String getString(NamedNodeMap map, String opt, String defaultVal) {
+        Node n = map.getNamedItem(opt);
+        if (n == null) {
+            return defaultVal;
+        }
+        String value = n.getNodeValue();
+
+        if (value == null) {
+            return defaultVal;
+        }
+        return value;
+    }
+
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/DefaultContentHandlerFactoryBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/DefaultContentHandlerFactoryBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/DefaultContentHandlerFactoryBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/DefaultContentHandlerFactoryBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,76 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.apache.tika.util.XMLDOMUtil;
+import org.w3c.dom.Node;
+
+/**
+ * Builds BasicContentHandler with type defined by attribute "basicHandlerType"
+ * with possible values: xml, html, text, body, ignore.
+ * Default is text.
+ * <p>
+ * Sets the writeLimit to the value of "writeLimit.
+ * Default is -1;
+ */
+public class DefaultContentHandlerFactoryBuilder implements 
IContentHandlerFactoryBuilder {
+
+    @Override
+    public ContentHandlerFactory build(Node node, Map<String, String> 
runtimeAttributes) {
+        Map<String, String> attributes = XMLDOMUtil.mapifyAttrs(node, 
runtimeAttributes);
+        BasicContentHandlerFactory.HANDLER_TYPE type = null;
+        String handlerTypeString = attributes.get("basicHandlerType");
+        if (handlerTypeString == null) {
+            handlerTypeString = "text";
+        }
+        handlerTypeString = handlerTypeString.toLowerCase(Locale.ROOT);
+        if (handlerTypeString.equals("xml")) {
+            type = BasicContentHandlerFactory.HANDLER_TYPE.XML;
+        } else if (handlerTypeString.equals("text")) {
+            type = BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+        } else if (handlerTypeString.equals("txt")) {
+            type = BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+        } else if (handlerTypeString.equals("html")) {
+            type = BasicContentHandlerFactory.HANDLER_TYPE.HTML;
+        } else if (handlerTypeString.equals("body")) {
+            type = BasicContentHandlerFactory.HANDLER_TYPE.BODY;
+        } else if (handlerTypeString.equals("ignore")) {
+            type = BasicContentHandlerFactory.HANDLER_TYPE.IGNORE;
+        } else {
+            type = BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+        }
+        int writeLimit = -1;
+        String writeLimitString = attributes.get("writeLimit");
+        if (writeLimitString != null) {
+            try {
+                writeLimit = Integer.parseInt(attributes.get("writeLimit"));
+            } catch (NumberFormatException e) {
+                //swallow and default to -1
+                //TODO: should we throw a RuntimeException?
+            }
+        }
+        return new BasicContentHandlerFactory(type, writeLimit);
+    }
+
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/IContentHandlerFactoryBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/IContentHandlerFactoryBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/IContentHandlerFactoryBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/IContentHandlerFactoryBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,29 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.w3c.dom.Node;
+
+public interface IContentHandlerFactoryBuilder extends 
ObjectFromDOMBuilder<ContentHandlerFactory> {
+
+  public ContentHandlerFactory build(Node node, Map<String, String> 
attributes);
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ICrawlerBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ICrawlerBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ICrawlerBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ICrawlerBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,32 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceCrawler;
+import org.w3c.dom.Node;
+
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+public interface ICrawlerBuilder extends 
ObjectFromDOMAndQueueBuilder<FileResourceCrawler>{
+  
+  public FileResourceCrawler build(Node node, Map<String, String> attributes,
+                                   ArrayBlockingQueue<FileResource> queue);
+  
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,33 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.tika.batch.Interrupter;
+import org.w3c.dom.Node;
+
+/**
+ * Builds an Interrupter
+ */
+public class InterrupterBuilder {
+
+    public Interrupter build(Node n, Map<String, String> commandlineArguments) 
{
+        return new Interrupter();
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMAndQueueBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMAndQueueBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMAndQueueBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMAndQueueBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,36 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.tika.batch.FileResource;
+import org.w3c.dom.Node;
+
+/**
+ * Same as {@link org.apache.tika.batch.builders.ObjectFromDOMAndQueueBuilder},
+ * but this is for objects that require access to the shared queue.
+ * @param <T>
+ */
+public interface ObjectFromDOMAndQueueBuilder<T> {
+
+    public T build(Node node, Map<String, String> runtimeAttributes,
+                   ArrayBlockingQueue<FileResource> resourceQueue);
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ObjectFromDOMBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,31 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.w3c.dom.Node;
+
+import java.util.Map;
+
+/**
+ * Interface for things that build objects from a DOM Node and a map of 
runtime attributes
+ * @param <T>
+ */
+public interface ObjectFromDOMBuilder<T> {
+
+    public T build(Node node, Map<String, String> runtimeAttributes);
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ReporterBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ReporterBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ReporterBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/ReporterBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,30 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.tika.batch.StatusReporter;
+import org.w3c.dom.Node;
+
+/**
+ * Interface for reporter builders
+ */
+public interface ReporterBuilder extends ObjectFromDOMBuilder<StatusReporter> {
+    public StatusReporter build(Node n, Map<String, String> runtimeAttributes);
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/SimpleLogReporterBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/SimpleLogReporterBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/SimpleLogReporterBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/SimpleLogReporterBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,43 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResourceCrawler;
+import org.apache.tika.batch.StatusReporter;
+import org.apache.tika.util.PropsUtil;
+import org.apache.tika.util.XMLDOMUtil;
+import org.w3c.dom.Node;
+
+public class SimpleLogReporterBuilder implements StatusReporterBuilder {
+
+    @Override
+    public StatusReporter build(FileResourceCrawler crawler, ConsumersManager 
consumersManager,
+                                Node n, Map<String, String> 
commandlineArguments) {
+
+        Map<String, String> attributes = XMLDOMUtil.mapifyAttrs(n, 
commandlineArguments);
+        long sleepMillis = PropsUtil.getLong(attributes.get("sleepMillis"), 
1000L);
+        long staleThresholdMillis = 
PropsUtil.getLong(attributes.get("reporterStaleThresholdMillis"), 500000L);
+        StatusReporter reporter = new StatusReporter(crawler, 
consumersManager);
+        reporter.setSleepMillis(sleepMillis);
+        reporter.setStaleThresholdMillis(staleThresholdMillis);
+        return reporter;
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/StatusReporterBuilder.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/StatusReporterBuilder.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/StatusReporterBuilder.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/StatusReporterBuilder.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,31 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResourceCrawler;
+import org.apache.tika.batch.StatusReporter;
+import org.w3c.dom.Node;
+
+public interface StatusReporterBuilder {
+
+    public StatusReporter build(FileResourceCrawler crawler, ConsumersManager 
consumers,
+                                Node n, Map<String, String> 
commandlineArguments);
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/AbstractFSConsumer.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/AbstractFSConsumer.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/AbstractFSConsumer.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/AbstractFSConsumer.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,105 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.log4j.Level;
+import org.apache.tika.batch.BatchNoRestartError;
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceConsumer;
+import org.apache.tika.batch.OutputStreamFactory;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.xml.sax.ContentHandler;
+
+public abstract class AbstractFSConsumer extends FileResourceConsumer {
+
+    public AbstractFSConsumer(ArrayBlockingQueue<FileResource> fileQueue) {
+        super(fileQueue);
+    }
+
+    /**
+     * Use this for consistent logging of exceptions.  Clients must
+     * check for whether the os is null, which is the signal
+     * that the output file already exists and should be skipped.
+     *
+     * @param fsOSFactory factory that creates the outputstream
+     * @param fileResource used by the OSFactory to create the stream
+     * @return the OutputStream or null if the output file already exists
+     */
+    protected OutputStream getOutputStream(OutputStreamFactory fsOSFactory,
+                                           FileResource fileResource) {
+        OutputStream os = null;
+        try {
+            os = fsOSFactory.getOutputStream(fileResource.getMetadata());
+        } catch (IOException e) {
+            //This can happen if the disk has run out of space,
+            //or if there was a failure with mkdirs in fsOSFactory
+            logWithResourceId(Level.FATAL, "ioe_opening_os",
+                    fileResource.getResourceId(), e);
+            throw new BatchNoRestartError("IOException trying to open output 
stream for " +
+                    fileResource.getResourceId() + " :: " + e.getMessage());
+        }
+        return os;
+    }
+
+    protected InputStream getInputStream(FileResource fileResource) {
+        InputStream is = null;
+        try {
+            is = fileResource.openInputStream();
+        } catch (IOException e) {
+            logWithResourceId(Level.ERROR, "ioe_opening_is",
+                    fileResource.getResourceId(), e);
+            flushAndClose(is);
+        }
+        return is;
+    }
+
+    protected void parse(final String resourceId, final Parser parser, 
InputStream is,
+                         final ContentHandler handler,
+                         final Metadata m, final ParseContext parseContext) 
throws Throwable {
+
+        Throwable thrown = null;
+        try {
+            parser.parse(is, handler, m, parseContext);
+        } catch (Throwable t) {
+            if (t instanceof OutOfMemoryError) {
+                logWithResourceId(Level.ERROR, "oom",
+                        resourceId, t);
+            } else if (t instanceof Error) {
+                logWithResourceId(Level.ERROR, "parse_err",
+                        resourceId, t);
+            } else {
+                logWithResourceId(Level.ERROR, "parse_ex",
+                        resourceId, t);
+                incrementHandledExceptions();
+            }
+            thrown = t;
+        } finally {
+            close(is);
+        }
+        if (thrown != null) {
+            throw thrown;
+        }
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/BasicTikaFSConsumer.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/BasicTikaFSConsumer.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/BasicTikaFSConsumer.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/BasicTikaFSConsumer.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,125 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.log4j.Level;
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.OutputStreamFactory;
+import org.apache.tika.batch.ParserFactory;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.io.IOUtils;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.xml.sax.ContentHandler;
+
+/**
+ * Basic FileResourceConsumer that reads files from an input
+ * directory and writes content to the output directory.
+ * <p>
+ * This catches all exceptions and errors and then logs them.
+ * This will re-throw errors.
+ *
+ */
+public class BasicTikaFSConsumer extends AbstractFSConsumer {
+
+    private boolean parseRecursively = true;
+    private final ParserFactory parserFactory;
+    private final ContentHandlerFactory contentHandlerFactory;
+    private final OutputStreamFactory fsOSFactory;
+    private final TikaConfig config;
+    private String outputEncoding = IOUtils.UTF_8.toString();
+
+
+    public BasicTikaFSConsumer(ArrayBlockingQueue<FileResource> queue,
+                               ParserFactory parserFactory,
+                               ContentHandlerFactory contentHandlerFactory,
+                               OutputStreamFactory fsOSFactory,
+                               TikaConfig config) {
+        super(queue);
+        this.parserFactory = parserFactory;
+        this.contentHandlerFactory = contentHandlerFactory;
+        this.fsOSFactory = fsOSFactory;
+        this.config = config;
+    }
+
+    @Override
+    public boolean processFileResource(FileResource fileResource) {
+
+        Parser parser = parserFactory.getParser(config);
+        ParseContext context = new ParseContext();
+        if (parseRecursively) {
+            context.set(Parser.class, parser);
+        }
+
+        OutputStream os = getOutputStream(fsOSFactory, fileResource);
+        //os can be null if fsOSFactory is set to skip processing a file if 
the output
+        //file already exists
+        if (os == null) {
+            logger.debug("Skipping: " + 
fileResource.getMetadata().get(FSProperties.FS_REL_PATH));
+            return false;
+        }
+
+        InputStream is = getInputStream(fileResource);
+        if (is == null) {
+            IOUtils.closeQuietly(os);
+            return false;
+        }
+        ContentHandler handler;
+        try {
+            handler = contentHandlerFactory.getNewContentHandler(os, 
getOutputEncoding());
+        } catch (UnsupportedEncodingException e) {
+            incrementHandledExceptions();
+            logWithResourceId(Level.FATAL, "output_encoding_ex",
+                    fileResource.getResourceId(), e);
+            flushAndClose(os);
+            throw new RuntimeException(e.getMessage());
+        }
+
+        //now actually call parse!
+        Throwable thrown = null;
+        try {
+            parse(fileResource.getResourceId(), parser, is, handler,
+                    fileResource.getMetadata(), context);
+        } catch (Error t) {
+            throw t;
+        } catch (Throwable t) {
+            thrown = t;
+        } finally {
+            flushAndClose(os);
+        }
+
+        if (thrown != null) {
+            return false;
+        }
+        return true;
+    }
+
+    public String getOutputEncoding() {
+        return outputEncoding;
+    }
+
+    public void setOutputEncoding(String outputEncoding) {
+        this.outputEncoding = outputEncoding;
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSBatchProcessCLI.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSBatchProcessCLI.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSBatchProcessCLI.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSBatchProcessCLI.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,160 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.tika.batch.BatchProcess;
+import org.apache.tika.batch.BatchProcessDriverCLI;
+import org.apache.tika.batch.ParallelFileProcessingResult;
+import org.apache.tika.batch.builders.BatchProcessBuilder;
+import org.apache.tika.batch.builders.CommandLineParserBuilder;
+import org.apache.tika.io.IOUtils;
+import org.apache.tika.io.TikaInputStream;
+
+public class FSBatchProcessCLI {
+    public static String FINISHED_STRING = "Main thread in TikaFSBatchCLI has 
finished processing.";
+
+    private static Logger logger = Logger.getLogger(FSBatchProcessCLI.class);
+    private final Options options;
+
+    public FSBatchProcessCLI(String[] args) throws IOException {
+        TikaInputStream configIs = null;
+        try {
+            configIs = getConfigInputStream(args);
+            CommandLineParserBuilder builder = new CommandLineParserBuilder();
+            options = builder.build(configIs);
+        } finally {
+            IOUtils.closeQuietly(configIs);
+        }
+    }
+
+    public void usage() {
+        HelpFormatter helpFormatter = new HelpFormatter();
+        helpFormatter.printHelp("tika filesystem batch", options);
+    }
+
+    private TikaInputStream getConfigInputStream(String[] args) throws 
IOException {
+        TikaInputStream is = null;
+        File batchConfigFile = getConfigFile(args);
+        if (batchConfigFile != null) {
+            //this will throw IOException if it can't find a specified config 
file
+            //better to throw an exception than silently back off to default.
+            is = TikaInputStream.get(batchConfigFile);
+        } else {
+            logger.info("No config file set via -bc, relying on 
default-tika-batch-config.xml");
+            is = TikaInputStream.get(
+                    
FSBatchProcessCLI.class.getResourceAsStream("default-tika-batch-config.xml"));
+        }
+        return is;
+    }
+
+    private void execute(String[] args) throws Exception {
+
+        CommandLineParser cliParser = new GnuParser();
+        CommandLine line = cliParser.parse(options, args);
+
+        if (line.hasOption("help")) {
+            usage();
+            System.exit(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE);
+        }
+
+        Map<String, String> mapArgs = new HashMap<String, String>();
+        for (Option option : line.getOptions()) {
+            String v = option.getValue();
+            if (v == null || v.equals("")) {
+                v = "true";
+            }
+            mapArgs.put(option.getOpt(), v);
+        }
+
+        BatchProcessBuilder b = new BatchProcessBuilder();
+        TikaInputStream is = null;
+        BatchProcess process = null;
+        try {
+            is = getConfigInputStream(args);
+            process = b.build(is, mapArgs);
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+        final Thread mainThread = Thread.currentThread();
+
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<ParallelFileProcessingResult> futureResult = 
executor.submit(process);
+
+        ParallelFileProcessingResult result = futureResult.get();
+        System.out.println(FINISHED_STRING);
+        System.out.println("\n");
+        System.out.println(result.toString());
+        System.exit(result.getExitStatus());
+    }
+
+    private File getConfigFile(String[] args) {
+        File configFile = null;
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].equals("-bc") || args[i].equals("-batch-config")) {
+                if (i < args.length-1) {
+                    configFile = new File(args[i+1]);
+                }
+            }
+        }
+        return configFile;
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        //if no log4j config file has been set via
+        //sysprops, use BasicConfigurator
+        String log4jFile = System.getProperty("log4j.configuration");
+        if (log4jFile == null || log4jFile.trim().length()==0) {
+            ConsoleAppender appender = new ConsoleAppender();
+            appender.setLayout(new PatternLayout("%m%n"));
+            appender.setWriter(new OutputStreamWriter(System.out, 
IOUtils.UTF_8.name()));
+            BasicConfigurator.configure(appender);
+            Logger.getRootLogger().setLevel(Level.INFO);
+        }
+        try{
+            FSBatchProcessCLI cli = new FSBatchProcessCLI(args);
+            cli.execute(args);
+        } catch (Throwable t) {
+            t.printStackTrace();
+            logger.fatal("Fatal exception from FSBatchProcessCLI: 
"+t.getMessage(), t);
+            System.exit(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE);
+        }
+    }
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSConsumersManager.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSConsumersManager.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSConsumersManager.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSConsumersManager.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,42 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResourceConsumer;
+
+public class FSConsumersManager extends ConsumersManager {
+
+
+    public FSConsumersManager(List<FileResourceConsumer> consumers) {
+        super(consumers);
+    }
+
+    @Override
+    public void init() {
+        //noop
+    }
+
+    @Override
+    public void shutdown() {
+        //noop
+    }
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDirectoryCrawler.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDirectoryCrawler.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDirectoryCrawler.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDirectoryCrawler.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,155 @@
+package org.apache.tika.batch.fs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceCrawler;
+
+public class FSDirectoryCrawler extends FileResourceCrawler {
+
+    public enum CRAWL_ORDER
+    {
+        SORTED, //alphabetical order; necessary for cross-platform unit tests
+        RANDOM, //shuffle
+        OS_ORDER //operating system chooses
+    }
+
+    private final File root;
+    private final File startDirectory;
+    private final Comparator<File> fileComparator = new FileNameComparator();
+    private CRAWL_ORDER crawlOrder;
+
+    public FSDirectoryCrawler(ArrayBlockingQueue<FileResource> fileQueue,
+                              int numConsumers, File root, CRAWL_ORDER 
crawlOrder) {
+        super(fileQueue, numConsumers);
+        this.root = root;
+        this.startDirectory = root;
+        this.crawlOrder = crawlOrder;
+        if (! startDirectory.isDirectory()) {
+            throw new RuntimeException("Crawler couldn't find this directory:" 
+ startDirectory.getAbsolutePath());
+        }
+
+    }
+
+    public FSDirectoryCrawler(ArrayBlockingQueue<FileResource> fileQueue,
+                              int numConsumers, File root, File startDirectory,
+                              CRAWL_ORDER crawlOrder) {
+        super(fileQueue, numConsumers);
+        this.root = root;
+        this.startDirectory = startDirectory;
+        this.crawlOrder = crawlOrder;
+        assert(FSUtil.checkThisIsAncestorOfOrSameAsThat(root, startDirectory));
+        if (! startDirectory.isDirectory()) {
+            throw new RuntimeException("Crawler couldn't find this directory:" 
+ startDirectory.getAbsolutePath());
+        }
+    }
+
+    public void start() throws InterruptedException {
+        addFiles(startDirectory);
+    }
+
+    private void addFiles(File directory) throws InterruptedException {
+
+        if (directory == null ||
+                !directory.isDirectory() || !directory.canRead()) {
+            String path = "null path";
+            if (directory != null) {
+                path = directory.getAbsolutePath();
+            }
+            logger.warn("FSFileAdder can't read this directory: " + path);
+            return;
+        }
+
+        List<File> directories = new ArrayList<File>();
+        File[] fileArr = directory.listFiles();
+        if (fileArr == null) {
+            logger.info("Empty directory: " + directory.getAbsolutePath());
+            return;
+        }
+
+        List<File> files = new ArrayList<File>(Arrays.asList(fileArr));
+
+        if (crawlOrder == CRAWL_ORDER.RANDOM) {
+            Collections.shuffle(files);
+        } else if (crawlOrder == CRAWL_ORDER.SORTED) {
+            Collections.sort(files, fileComparator);
+        }
+
+        int numFiles = 0;
+        for (File f : files) {
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException("file adder interrupted");
+            }
+
+            if (f.isFile()) {
+                numFiles++;
+                if (numFiles == 1) {
+                    handleFirstFileInDirectory(f);
+                }
+            }
+            if (f.isDirectory()) {
+                directories.add(f);
+                continue;
+            }
+            int added = tryToAdd(new FSFileResource(root, f));
+            if (added == FileResourceCrawler.STOP_NOW) {
+                logger.debug("crawler has hit a limit: "+f.getAbsolutePath() + 
" : " + added);
+                return;
+            }
+            logger.debug("trying to add: "+f.getAbsolutePath() + " : " + 
added);
+        }
+
+        for (File f : directories) {
+            addFiles(f);
+        }
+    }
+
+    /**
+     * Override this if you have any special handling
+     * for the first actual file that the crawler comes across
+     * in a directory.  For example, it might be handy to call
+     * mkdirs() on an output directory if your FileResourceConsumers
+     * are writing to a file.
+     *
+     * @param f file to handle
+     */
+    public void handleFirstFileInDirectory(File f) {
+        //no-op
+    }
+
+    //simple lexical order for the file name, we don't really care about 
localization.
+    //we do want this, though, because file.compareTo behaves differently
+    //on different OS's.
+    private class FileNameComparator implements Comparator<File> {
+
+        @Override
+        public int compare(File f1, File f2) {
+            if (f1 == null || f2 == null) {
+                return 0;
+            }
+            return f1.getName().compareTo(f2.getName());
+        }
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDocumentSelector.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDocumentSelector.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDocumentSelector.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSDocumentSelector.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,83 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.util.PropsUtil;
+
+/**
+ * Selector that chooses files based on their file name
+ * and their size, as determined by Metadata.RESOURCE_NAME_KEY and 
Metadata.CONTENT_LENGTH.
+ * <p/>
+ * The {@link #excludeFileName} pattern is applied first (if it isn't null).
+ * Then the {@link #includeFileName} pattern is applied (if it isn't null),
+ * and finally, the size limit is applied if it is above 0.
+ */
+public class FSDocumentSelector implements DocumentSelector {
+
+    //can be null!
+    private final Pattern includeFileName;
+
+    //can be null!
+    private final Pattern excludeFileName;
+    private final long maxFileSizeBytes;
+    private final long minFileSizeBytes;
+
+    public FSDocumentSelector(Pattern includeFileName, Pattern 
excludeFileName, long minFileSizeBytes,
+                              long maxFileSizeBytes) {
+        this.includeFileName = includeFileName;
+        this.excludeFileName = excludeFileName;
+        this.minFileSizeBytes = minFileSizeBytes;
+        this.maxFileSizeBytes = maxFileSizeBytes;
+    }
+
+    @Override
+    public boolean select(Metadata metadata) {
+        String fName = metadata.get(Metadata.RESOURCE_NAME_KEY);
+        long sz = PropsUtil.getLong(metadata.get(Metadata.CONTENT_LENGTH), 
-1L);
+        if (maxFileSizeBytes > -1 && sz > 0) {
+            if (sz > maxFileSizeBytes) {
+                return false;
+            }
+        }
+
+        if (minFileSizeBytes > -1 && sz > 0) {
+            if (sz < minFileSizeBytes) {
+                return false;
+            }
+        }
+
+        if (excludeFileName != null && fName != null) {
+            Matcher m = excludeFileName.matcher(fName);
+            if (m.find()) {
+                return false;
+            }
+        }
+
+        if (includeFileName != null && fName != null) {
+            Matcher m = includeFileName.matcher(fName);
+            return m.find();
+        }
+        return true;
+    }
+
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSFileResource.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSFileResource.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSFileResource.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSFileResource.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,99 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Locale;
+
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+
+/**
+ * FileSystem(FS)Resource wraps a file name.
+ * <p/>
+ * This class automatically sets the following keys in Metadata:
+ * <ul>
+ *     <li>Metadata.RESOURCE_NAME_KEY (file name)</li>
+ *     <li>Metadata.CONTENT_LENGTH</li>
+ *     <li>FSProperties.FS_REL_PATH</li>
+ *     <li>FileResource.FILE_EXTENSION</li>
+ * </ul>,
+ */
+public class FSFileResource implements FileResource {
+
+    private final File fullPath;
+    private final String relativePath;
+    private final Metadata metadata;
+
+    public FSFileResource(File inputRoot, File fullPath) {
+        this.fullPath = fullPath;
+        this.metadata = new Metadata();
+        //child path must actually be a child
+        assert(FSUtil.checkThisIsAncestorOfThat(inputRoot, fullPath));
+        this.relativePath = 
fullPath.getAbsolutePath().substring(inputRoot.getAbsolutePath().length()+1);
+
+        //need to set these now so that the filter can determine
+        //whether or not to crawl this file
+        metadata.set(Metadata.RESOURCE_NAME_KEY, fullPath.getName());
+        metadata.set(Metadata.CONTENT_LENGTH, 
Long.toString(fullPath.length()));
+        metadata.set(FSProperties.FS_REL_PATH, relativePath);
+        metadata.set(FileResource.FILE_EXTENSION, getExtension(fullPath));
+    }
+
+    /**
+     * Simple extension extractor that takes whatever comes after the
+     * last period in the path.  It returns a lowercased version of the 
"extension."
+     * <p>
+     * If there is no period, it returns an empty string.
+     *
+     * @param fullPath full path from which to try to find an extension
+     * @return the lowercased extension or an empty string
+     */
+    private String getExtension(File fullPath) {
+        String p = fullPath.getName();
+        int i = p.lastIndexOf(".");
+        if (i > -1) {
+            return p.substring(i + 1).toLowerCase(Locale.ROOT);
+        }
+        return "";
+    }
+
+    /**
+     *
+     * @return file's relativePath
+     */
+    @Override
+    public String getResourceId() {
+        return relativePath;
+    }
+
+    @Override
+    public Metadata getMetadata() {
+        return metadata;
+    }
+
+    @Override
+    public InputStream openInputStream() throws IOException {
+        //no need to include Metadata because we already set the
+        //same information in the initializer
+        return TikaInputStream.get(fullPath);
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSListCrawler.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSListCrawler.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSListCrawler.java 
(added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSListCrawler.java 
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,80 @@
+package org.apache.tika.batch.fs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceCrawler;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * Class that "crawls" a list of files.
+ */
+public class FSListCrawler extends FileResourceCrawler {
+
+    private final BufferedReader reader;
+    private final File root;
+
+    public FSListCrawler(ArrayBlockingQueue<FileResource> fileQueue,
+                         int numConsumers, File root, File list, String 
encoding)
+            throws FileNotFoundException, UnsupportedEncodingException {
+        super(fileQueue, numConsumers);
+        reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(list), encoding));
+        this.root = root;
+
+    }
+
+    public void start() throws InterruptedException {
+        String line = nextLine();
+
+        while (line != null) {
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException("file adder interrupted");
+            }
+            File f = new File(root, line);
+            if (! f.exists()) {
+                logger.warn("File doesn't exist:"+f.getAbsolutePath());
+                line = nextLine();
+                continue;
+            }
+            if (f.isDirectory()) {
+                logger.warn("File is a directory:"+f.getAbsolutePath());
+                line = nextLine();
+                continue;
+            }
+            tryToAdd(new FSFileResource(root, f));
+            line = nextLine();
+        }
+    }
+
+    private String nextLine() {
+        String line = null;
+        try {
+            line = reader.readLine();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return line;
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSOutputStreamFactory.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSOutputStreamFactory.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSOutputStreamFactory.java
 (added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSOutputStreamFactory.java
 Mon Mar 23 16:09:10 2015
@@ -0,0 +1,94 @@
+package org.apache.tika.batch.fs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.tika.batch.OutputStreamFactory;
+import org.apache.tika.metadata.Metadata;
+
+public class FSOutputStreamFactory implements OutputStreamFactory {
+
+    public enum COMPRESSION {
+        NONE,
+        BZIP2,
+        GZIP,
+        ZIP
+    }
+
+    private final FSUtil.HANDLE_EXISTING handleExisting;
+    private final File outputRoot;
+    private final String suffix;
+    private final COMPRESSION compression;
+
+    public FSOutputStreamFactory(File outputRoot, FSUtil.HANDLE_EXISTING 
handleExisting,
+                                 COMPRESSION compression, String suffix) {
+        this.handleExisting = handleExisting;
+        this.outputRoot = outputRoot.getAbsoluteFile();
+        this.suffix = suffix;
+        this.compression = compression;
+    }
+
+    /**
+     * This tries to create a file based on the {@link 
org.apache.tika.batch.fs.FSUtil.HANDLE_EXISTING}
+     * value that was passed in during initialization.
+     * <p>
+     * If {@link #handleExisting} is set to "SKIP" and the output file already 
exists,
+     * this will return null.
+     * <p>
+     * If an output file can be found, this will try to mkdirs for that output 
file.
+     * If mkdirs() fails, this will throw an IOException.
+     * <p>
+     * Finally, this will open an output stream for the appropriate output 
file.
+     * @param metadata must have a value set for 
FSMetadataProperties.FS_ABSOLUTE_PATH or
+     *                 else NullPointerException will be thrown!
+     * @return OutputStream
+     * @throws java.io.IOException, NullPointerException
+     */
+    @Override
+    public OutputStream getOutputStream(Metadata metadata) throws IOException {
+        String initialRelativePath = metadata.get(FSProperties.FS_REL_PATH);
+        File outputFile = FSUtil.getOutputFile(outputRoot, 
initialRelativePath, handleExisting, suffix);
+        if (outputFile == null) {
+            return null;
+        }
+        if (! outputFile.getParentFile().isDirectory()) {
+            boolean success = outputFile.getParentFile().mkdirs();
+            //with multithreading, it is possible that the parent file was 
created between
+            //the test and the attempt to .mkdirs(); mkdirs() returns false if 
the dirs already exist
+            if (! success && ! outputFile.getParentFile().isDirectory()) {
+                throw new IOException("Couldn't create parent directory 
for:"+outputFile.getAbsolutePath());
+            }
+        }
+
+        OutputStream os = new FileOutputStream(outputFile);
+        if (compression == COMPRESSION.BZIP2){
+            os = new BZip2CompressorOutputStream(os);
+        } else if (compression == COMPRESSION.GZIP) {
+            os = new GZIPOutputStream(os);
+        } else if (compression == COMPRESSION.ZIP) {
+            os = new ZipArchiveOutputStream(os);
+        }
+        return os;
+    }
+}

Added: 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSProperties.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSProperties.java?rev=1668673&view=auto
==============================================================================
--- 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSProperties.java 
(added)
+++ 
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSProperties.java 
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,12 @@
+package org.apache.tika.batch.fs;
+
+import org.apache.tika.metadata.Property;
+
+public class FSProperties {
+    private final static String TIKA_BATCH_FS_NAMESPACE = "tika_batch_fs";
+
+    /**
+     * File's relative path (including file name) from a given source root
+     */
+    public final static Property FS_REL_PATH = 
Property.internalText(TIKA_BATCH_FS_NAMESPACE+":relative_path");
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSUtil.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSUtil.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSUtil.java 
(added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/FSUtil.java 
Mon Mar 23 16:09:10 2015
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.batch.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Utility class to handle some common issues when
+ * reading from and writing to a file system (FS).
+ */
+public class FSUtil {
+
+    public static boolean checkThisIsAncestorOfThat(File ancestor, File child) 
{
+        int ancLen = ancestor.getAbsolutePath().length();
+        int childLen = child.getAbsolutePath().length();
+        if (childLen <= ancLen) {
+            return false;
+        }
+
+        String childBase = child.getAbsolutePath().substring(0, ancLen);
+        return childBase.equals(ancestor.getAbsolutePath());
+
+    }
+
+    public static boolean checkThisIsAncestorOfOrSameAsThat(File ancestor, 
File child) {
+        if (ancestor.equals(child)) {
+            return true;
+        }
+        return checkThisIsAncestorOfThat(ancestor, child);
+    }
+
+    public enum HANDLE_EXISTING {
+        OVERWRITE,
+        RENAME,
+        SKIP
+    }
+
+    private final static Pattern FILE_NAME_PATTERN =
+            Pattern.compile("\\A(.*?)(?:\\((\\d+)\\))?\\.([^\\.]+)\\Z");
+
+    /**
+     * Given an output root and an initial relative path,
+     * return the output file according to the HANDLE_EXISTING strategy
+     * <p/>
+     * In the most basic use case, given a root directory "input",
+     * a file's relative path "dir1/dir2/fileA.docx", and an output directory
+     * "output", the output file would be "output/dir1/dir2/fileA.docx."
+     * <p/>
+     * If HANDLE_EXISTING is set to OVERWRITE, this will not check to see if 
the output already exists,
+     * and the returned file could overwrite an existing file!!!
+     * <p/>
+     * If HANDLE_EXISTING is set to RENAME, this will try to increment a 
counter at the end of
+     * the file name (fileA(2).docx) until there is a file name that doesn't 
exist.
+     * <p/>
+     * This will return null if handleExisting == HANDLE_EXISTING.SKIP and
+     * the candidate file already exists.
+     * <p/>
+     * This will throw an IOException if HANDLE_EXISTING is set to
+     * RENAME, and a candidate cannot output file cannot be found
+     * after trying to increment the file count (e.g. fileA(2).docx) 10000 
times
+     * and then after trying 20,000 UUIDs.
+     *
+     * @param outputRoot directory root for output
+     * @param initialRelativePath initial relative path (including file name, 
which may be renamed)
+     * @param handleExisting what to do if the output file exists
+     * @param suffix suffix to add to files, can be null
+     * @return output file or null if no output file should be created
+     * @throws java.io.IOException
+     */
+    public static File getOutputFile(File outputRoot, String 
initialRelativePath,
+                                     HANDLE_EXISTING handleExisting, String 
suffix) throws IOException {
+        String localSuffix = (suffix == null) ? "" : suffix;
+        File cand = new File(outputRoot, initialRelativePath+ "." 
+localSuffix);
+        if (cand.isFile()) {
+            if (handleExisting.equals(HANDLE_EXISTING.OVERWRITE)) {
+                return cand;
+            } else if (handleExisting.equals(HANDLE_EXISTING.SKIP)) {
+                return null;
+            }
+        }
+
+        //if we're here, the output file exists, and
+        //we must find a new name for it.
+
+        //groups for "testfile(1).txt":
+        //group(1) is "testfile"
+        //group(2) is 1
+        //group(3) is "txt"
+        //Note: group(2) can be null
+        int cnt = 0;
+        String fNameBase = null;
+        String fNameExt = "";
+        //this doesn't include the addition of the localSuffix
+        File candOnly = new File(outputRoot, initialRelativePath);
+        Matcher m = FILE_NAME_PATTERN.matcher(candOnly.getName());
+        if (m.find()) {
+            fNameBase = m.group(1);
+
+            if (m.group(2) != null) {
+                try {
+                    cnt = Integer.parseInt(m.group(2));
+                } catch (NumberFormatException e) {
+                    //swallow
+                }
+            }
+            if (m.group(3) != null) {
+                fNameExt = m.group(3);
+            }
+        }
+
+        File outputParent = cand.getParentFile();
+        while (fNameBase != null && cand.isFile() && ++cnt < 10000) {
+            String candFileName = fNameBase + "(" + cnt + ")." + fNameExt+ "" 
+localSuffix;
+            cand = new File(outputParent, candFileName);
+        }
+        //reset count to 0 and try 20000 times
+        cnt = 0;
+        while (cand.isFile() && cnt++ < 20000) {
+            UUID uid = UUID.randomUUID();
+            cand = new File(outputParent, uid.toString() + fNameExt+ "" 
+localSuffix);
+        }
+
+        if (cand.isFile()) {
+            throw new IOException("Couldn't find candidate output file after 
trying " +
+                    "very, very hard");
+        }
+        return cand;
+    }
+
+}
\ No newline at end of file


Reply via email to