Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,163 @@ +package org.apache.tika.batch.fs; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.log4j.Level; +import org.apache.tika.batch.BatchNoRestartError; +import org.apache.tika.batch.FileResource; +import org.apache.tika.batch.FileResourceConsumer; +import org.apache.tika.batch.OutputStreamFactory; +import org.apache.tika.batch.ParserFactory; +import org.apache.tika.config.TikaConfig; +import org.apache.tika.io.IOUtils; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.metadata.serialization.JsonMetadataList; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.Parser; +import org.apache.tika.parser.RecursiveParserWrapper; +import org.apache.tika.sax.ContentHandlerFactory; +import org.apache.tika.util.TikaExceptionFilter; +import org.xml.sax.helpers.DefaultHandler; + +/** + * Basic FileResourceConsumer that reads files from an input + * directory and writes content to the output directory. + * <p/> + * This tries to catch most of the common exceptions, log them and + * store them in the metadata list output. + */ +public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer { + + + private final ParserFactory parserFactory; + private final ContentHandlerFactory contentHandlerFactory; + private final OutputStreamFactory fsOSFactory; + private final TikaConfig tikaConfig; + private String outputEncoding = "UTF-8"; + //TODO: parameterize this + private TikaExceptionFilter exceptionFilter = new TikaExceptionFilter(); + + + public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue, + ParserFactory parserFactory, + ContentHandlerFactory contentHandlerFactory, + OutputStreamFactory fsOSFactory, TikaConfig tikaConfig) { + super(queue); + this.parserFactory = parserFactory; + this.contentHandlerFactory = contentHandlerFactory; + this.fsOSFactory = fsOSFactory; + this.tikaConfig = tikaConfig; + } + + @Override + public boolean processFileResource(FileResource fileResource) { + + Parser wrapped = parserFactory.getParser(tikaConfig); + RecursiveParserWrapper parser = new RecursiveParserWrapper(wrapped, contentHandlerFactory); + ParseContext context = new ParseContext(); + +// if (parseRecursively == true) { + context.set(Parser.class, parser); +// } + + //try to open outputstream first + OutputStream os = getOutputStream(fsOSFactory, fileResource); + + if (os == null) { + logger.debug("Skipping: " + fileResource.getMetadata().get(FSProperties.FS_REL_PATH)); + return false; + } + + //try to open the inputstream before the parse. + //if the parse hangs or throws a nasty exception, at least there will + //be a zero byte file there so that the batchrunner can skip that problematic + //file during the next run. + InputStream is = getInputStream(fileResource); + if (is == null) { + IOUtils.closeQuietly(os); + return false; + } + + Throwable thrown = null; + List<Metadata> metadataList = null; + Metadata containerMetadata = fileResource.getMetadata(); + try { + parse(fileResource.getResourceId(), parser, is, new DefaultHandler(), + containerMetadata, context); + metadataList = parser.getMetadata(); + } catch (Throwable t) { + thrown = t; + metadataList = parser.getMetadata(); + if (metadataList == null) { + metadataList = new LinkedList<Metadata>(); + } + Metadata m = null; + if (metadataList.size() == 0) { + m = containerMetadata; + } else { + //take the top metadata item + m = metadataList.remove(0); + } + String stackTrace = exceptionFilter.getStackTrace(t); + m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace); + metadataList.add(0, m); + } finally { + IOUtils.closeQuietly(is); + } + + Writer writer = null; + + try { + writer = new OutputStreamWriter(os, getOutputEncoding()); + JsonMetadataList.toJson(metadataList, writer); + } catch (Exception e) { + logWithResourceId(Level.ERROR, "json_ex", + fileResource.getResourceId(), e); + } finally { + flushAndClose(writer); + } + + if (thrown != null) { + if (thrown instanceof Error) { + throw (Error) thrown; + } else { + return false; + } + } + + return true; + } + + public String getOutputEncoding() { + return outputEncoding; + } + + public void setOutputEncoding(String outputEncoding) { + this.outputEncoding = outputEncoding; + } +}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.batch.fs.builders; + +import java.io.File; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.tika.batch.ConsumersManager; +import org.apache.tika.batch.FileResource; +import org.apache.tika.batch.FileResourceConsumer; +import org.apache.tika.batch.OutputStreamFactory; +import org.apache.tika.batch.ParserFactory; +import org.apache.tika.batch.builders.AbstractConsumersBuilder; +import org.apache.tika.batch.builders.BatchProcessBuilder; +import org.apache.tika.batch.builders.IContentHandlerFactoryBuilder; +import org.apache.tika.batch.fs.BasicTikaFSConsumer; +import org.apache.tika.batch.fs.FSConsumersManager; +import org.apache.tika.batch.fs.FSOutputStreamFactory; +import org.apache.tika.batch.fs.FSUtil; +import org.apache.tika.batch.fs.RecursiveParserWrapperFSConsumer; +import org.apache.tika.config.TikaConfig; +import org.apache.tika.sax.ContentHandlerFactory; +import org.apache.tika.util.ClassLoaderUtil; +import org.apache.tika.util.PropsUtil; +import org.apache.tika.util.XMLDOMUtil; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +public class BasicTikaFSConsumersBuilder extends AbstractConsumersBuilder { + + @Override + public ConsumersManager build(Node node, Map<String, String> runtimeAttributes, + ArrayBlockingQueue<FileResource> queue) { + + //figure out if we're building a recursiveParserWrapper + boolean recursiveParserWrapper = false; + String recursiveParserWrapperString = runtimeAttributes.get("recursiveParserWrapper"); + if (recursiveParserWrapperString != null){ + recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperString, recursiveParserWrapper); + } else { + Node recursiveParserWrapperNode = node.getAttributes().getNamedItem("recursiveParserWrapper"); + if (recursiveParserWrapperNode != null) { + recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperNode.getNodeValue(), recursiveParserWrapper); + } + } + + //how long to let the consumersManager run on init() and shutdown() + Long consumersManagerMaxMillis = null; + String consumersManagerMaxMillisString = runtimeAttributes.get("consumersManagerMaxMillis"); + if (consumersManagerMaxMillisString != null){ + consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisString, null); + } else { + Node consumersManagerMaxMillisNode = node.getAttributes().getNamedItem("consumersManagerMaxMillis"); + if (consumersManagerMaxMillis == null) { + consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisNode.getNodeValue(), + null); + } + } + + TikaConfig config = null; + String tikaConfigPath = runtimeAttributes.get("c"); + + if( tikaConfigPath == null) { + Node tikaConfigNode = node.getAttributes().getNamedItem("tikaConfig"); + if (tikaConfigNode != null) { + tikaConfigPath = PropsUtil.getString(tikaConfigNode.getNodeValue(), null); + } + } + if (tikaConfigPath != null) { + try { + config = new TikaConfig(new File(tikaConfigPath)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + config = TikaConfig.getDefaultConfig(); + } + + List<FileResourceConsumer> consumers = new LinkedList<FileResourceConsumer>(); + int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes); + + NodeList nodeList = node.getChildNodes(); + Node contentHandlerFactoryNode = null; + Node parserFactoryNode = null; + Node outputStreamFactoryNode = null; + + for (int i = 0; i < nodeList.getLength(); i++){ + Node child = nodeList.item(i); + String cn = child.getNodeName(); + if (cn.equals("parser")){ + parserFactoryNode = child; + } else if (cn.equals("contenthandler")) { + contentHandlerFactoryNode = child; + } else if (cn.equals("outputstream")) { + outputStreamFactoryNode = child; + } + } + + if (contentHandlerFactoryNode == null || parserFactoryNode == null + || outputStreamFactoryNode == null) { + throw new RuntimeException("You must specify a ContentHandlerFactory, "+ + "a ParserFactory and an OutputStreamFactory"); + } + ContentHandlerFactory contentHandlerFactory = getContentHandlerFactory(contentHandlerFactoryNode, runtimeAttributes); + ParserFactory parserFactory = getParserFactory(parserFactoryNode, runtimeAttributes); + OutputStreamFactory outputStreamFactory = getOutputStreamFactory(outputStreamFactoryNode, runtimeAttributes); + + if (recursiveParserWrapper) { + for (int i = 0; i < numConsumers; i++) { + FileResourceConsumer c = new RecursiveParserWrapperFSConsumer(queue, + parserFactory, contentHandlerFactory, outputStreamFactory, config); + consumers.add(c); + } + } else { + for (int i = 0; i < numConsumers; i++) { + FileResourceConsumer c = new BasicTikaFSConsumer(queue, + parserFactory, contentHandlerFactory, outputStreamFactory, config); + consumers.add(c); + } + } + ConsumersManager manager = new FSConsumersManager(consumers); + if (consumersManagerMaxMillis != null) { + manager.setConsumersManagerMaxMillis(consumersManagerMaxMillis); + } + return manager; + } + + + private ContentHandlerFactory getContentHandlerFactory(Node node, Map<String, String> runtimeAttributes) { + + Map<String, String> localAttrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes); + String className = localAttrs.get("builderClass"); + if (className == null) { + throw new RuntimeException("Must specify builderClass for contentHandler"); + } + IContentHandlerFactoryBuilder builder = ClassLoaderUtil.buildClass(IContentHandlerFactoryBuilder.class, className); + return builder.build(node, runtimeAttributes); + } + + private ParserFactory getParserFactory(Node node, Map<String, String> runtimeAttributes) { + //TODO: add ability to set TikaConfig file path + Map<String, String> localAttrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes); + String className = localAttrs.get("class"); + return ClassLoaderUtil.buildClass(ParserFactory.class, className); + } + + private OutputStreamFactory getOutputStreamFactory(Node node, Map<String, String> runtimeAttributes) { + Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes); + + File outputDir = PropsUtil.getFile(attrs.get("outputDir"), null); +/* FSUtil.HANDLE_EXISTING handleExisting = null; + String handleExistingString = attrs.get("handleExisting"); + if (handleExistingString == null) { + handleExistingException(); + } else if (handleExistingString.equals("overwrite")){ + handleExisting = FSUtil.HANDLE_EXISTING.OVERWRITE; + } else if (handleExistingString.equals("rename")) { + handleExisting = FSUtil.HANDLE_EXISTING.RENAME; + } else if (handleExistingString.equals("skip")) { + handleExisting = FSUtil.HANDLE_EXISTING.SKIP; + } else { + handleExistingException(); + } +*/ + String compressionString = attrs.get("compression"); + FSOutputStreamFactory.COMPRESSION compression = FSOutputStreamFactory.COMPRESSION.NONE; + if (compressionString == null) { + //do nothing + } else if (compressionString.contains("bz")) { + compression = FSOutputStreamFactory.COMPRESSION.BZIP2; + } else if (compressionString.contains("gz")) { + compression = FSOutputStreamFactory.COMPRESSION.GZIP; + } else if (compressionString.contains("zip")) { + compression = FSOutputStreamFactory.COMPRESSION.ZIP; + } + String suffix = attrs.get("outputSuffix"); + + //TODO: possibly open up the different handle existings in the future + //but for now, lock it down to require skip. Too dangerous otherwise + //if the driver restarts and this is set to overwrite... + return new FSOutputStreamFactory(outputDir, FSUtil.HANDLE_EXISTING.SKIP, + compression, suffix); + } + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,129 @@ +package org.apache.tika.batch.fs.builders; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.File; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.regex.Pattern; + +import org.apache.tika.batch.FileResource; +import org.apache.tika.batch.FileResourceCrawler; +import org.apache.tika.batch.builders.BatchProcessBuilder; +import org.apache.tika.batch.builders.ICrawlerBuilder; +import org.apache.tika.batch.fs.FSDirectoryCrawler; +import org.apache.tika.batch.fs.FSDocumentSelector; +import org.apache.tika.extractor.DocumentSelector; +import org.apache.tika.util.PropsUtil; +import org.apache.tika.util.XMLDOMUtil; +import org.w3c.dom.Node; + +/** + * Builds either an FSDirectoryCrawler or an FSListCrawler. + */ +public class FSCrawlerBuilder implements ICrawlerBuilder { + + private final static String MAX_CONSEC_WAIT_MILLIS = "maxConsecWaitMillis"; + private final static String MAX_FILES_TO_ADD_ATTR = "maxFilesToAdd"; + private final static String MAX_FILES_TO_CONSIDER_ATTR = "maxFilesToConsider"; + + + private final static String CRAWL_ORDER = "crawlOrder"; + private final static String INPUT_DIR_ATTR = "inputDir"; + private final static String INPUT_START_DIR_ATTR = "startDir"; + private final static String MAX_FILE_SIZE_BYTES_ATTR = "maxFileSizeBytes"; + private final static String MIN_FILE_SIZE_BYTES_ATTR = "minFileSizeBytes"; + + + private final static String INCLUDE_FILE_PAT_ATTR = "includeFilePat"; + private final static String EXCLUDE_FILE_PAT_ATTR = "excludeFilePat"; + + @Override + public FileResourceCrawler build(Node node, Map<String, String> runtimeAttributes, + ArrayBlockingQueue<FileResource> queue) { + + Map<String, String> attributes = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes); + + int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes); + File inputDir = PropsUtil.getFile(attributes.get(INPUT_DIR_ATTR), new File("input")); + FileResourceCrawler crawler = null; + if (attributes.containsKey("fileList")) { + String randomCrawlString = attributes.get(CRAWL_ORDER); + + if (randomCrawlString != null) { + //TODO: change to logger warn or throw RuntimeException? + System.err.println("randomCrawl attribute is ignored by FSListCrawler"); + } + File fileList = PropsUtil.getFile(attributes.get("fileList"), null); + String encoding = PropsUtil.getString(attributes.get("fileListEncoding"), "UTF-8"); + try { + crawler = new org.apache.tika.batch.fs.FSListCrawler(queue, numConsumers, inputDir, fileList, encoding); + } catch (java.io.FileNotFoundException e) { + throw new RuntimeException("fileList file not found for FSListCrawler: " + fileList.getAbsolutePath()); + } catch (java.io.UnsupportedEncodingException e) { + throw new RuntimeException("fileList encoding not supported: "+encoding); + } + } else { + FSDirectoryCrawler.CRAWL_ORDER crawlOrder = getCrawlOrder(attributes.get(CRAWL_ORDER)); + File startDir = PropsUtil.getFile(attributes.get(INPUT_START_DIR_ATTR), null); + if (startDir == null) { + crawler = new FSDirectoryCrawler(queue, numConsumers, inputDir, crawlOrder); + } else { + crawler = new FSDirectoryCrawler(queue, numConsumers, inputDir, startDir, crawlOrder); + } + } + + crawler.setMaxFilesToConsider(PropsUtil.getInt(attributes.get(MAX_FILES_TO_CONSIDER_ATTR), -1)); + crawler.setMaxFilesToAdd(PropsUtil.getInt(attributes.get(MAX_FILES_TO_ADD_ATTR), -1)); + + DocumentSelector selector = buildSelector(attributes); + if (selector != null) { + crawler.setDocumentSelector(selector); + } + + crawler.setMaxConsecWaitInMillis(PropsUtil.getLong(attributes.get(MAX_CONSEC_WAIT_MILLIS), 300000L));//5 minutes + return crawler; + } + + private FSDirectoryCrawler.CRAWL_ORDER getCrawlOrder(String s) { + if (s == null || s.trim().length() == 0 || s.equals("os")) { + return FSDirectoryCrawler.CRAWL_ORDER.OS_ORDER; + } else if (s.toLowerCase(Locale.ROOT).contains("rand")) { + return FSDirectoryCrawler.CRAWL_ORDER.RANDOM; + } else if (s.toLowerCase(Locale.ROOT).contains("sort")) { + return FSDirectoryCrawler.CRAWL_ORDER.SORTED; + } else { + return FSDirectoryCrawler.CRAWL_ORDER.OS_ORDER; + } + } + + private DocumentSelector buildSelector(Map<String, String> attributes) { + String includeString = attributes.get(INCLUDE_FILE_PAT_ATTR); + String excludeString = attributes.get(EXCLUDE_FILE_PAT_ATTR); + long maxFileSize = PropsUtil.getLong(attributes.get(MAX_FILE_SIZE_BYTES_ATTR), -1L); + long minFileSize = PropsUtil.getLong(attributes.get(MIN_FILE_SIZE_BYTES_ATTR), -1L); + Pattern includePat = (includeString != null && includeString.length() > 0) ? Pattern.compile(includeString) : null; + Pattern excludePat = (excludeString != null && excludeString.length() > 0) ? Pattern.compile(excludeString) : null; + + return new FSDocumentSelector(includePat, excludePat, minFileSize, maxFileSize); + } + + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,249 @@ +package org.apache.tika.batch.fs.strawman; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; +import org.apache.tika.io.IOUtils; + +/** + * Simple single-threaded class that calls tika-app against every file in a directory. + * + * This is exceedingly robust. One file per process. + * + * However, you can use this to compare performance against tika-batch fs code. + * + * + */ +public class StrawManTikaAppDriver implements Callable<Integer> { + + private static AtomicInteger threadCount = new AtomicInteger(0); + private final int totalThreads; + private final int threadNum; + private int rootLen = -1; + private File inputDir = null; + private File outputDir = null; + private String[] args = null; + private Logger logger = Logger.getLogger(StrawManTikaAppDriver.class); + + + public StrawManTikaAppDriver(File inputDir, File outputDir, int totalThreads, String[] args) { + rootLen = inputDir.getAbsolutePath().length()+1; + this.inputDir = inputDir; + this.outputDir = outputDir; + this.args = args; + threadNum = threadCount.getAndIncrement(); + this.totalThreads = totalThreads; + } + + + private int processDirectory(File inputDir) { + int processed = 0; + if (inputDir == null || inputDir.listFiles() == null) { + return processed; + } + for (File f : inputDir.listFiles()) { + List<File> childDirs = new ArrayList<File>(); + if (f.isDirectory()) { + childDirs.add(f); + } else { + processed += processFile(f); + } + for (File dir : childDirs) { + processed += processDirectory(dir); + + } + } + return processed; + } + + private int processFile(File f) { + if (totalThreads > 1) { + int hashCode = f.getAbsolutePath().hashCode(); + if (Math.abs(hashCode % totalThreads) != threadNum) { + return 0; + } + } + File outputFile = new File(outputDir, f.getAbsolutePath().substring(rootLen)+".txt"); + outputFile.getAbsoluteFile().getParentFile().mkdirs(); + if (! outputFile.getParentFile().exists()) { + logger.fatal("parent directory for "+ outputFile + " was not made!"); + throw new RuntimeException("couldn't make parent file for " + outputFile); + } + List<String> commandLine = new ArrayList<String>(); + for (String arg : args) { + commandLine.add(arg); + } + commandLine.add("-t"); + commandLine.add("\""+f.getAbsolutePath()+"\""); + ProcessBuilder builder = new ProcessBuilder(commandLine.toArray(new String[commandLine.size()])); + logger.info("about to process: "+f.getAbsolutePath()); + Process proc = null; + RedirectGobbler gobbler = null; + Thread gobblerThread = null; + try { + OutputStream os = new FileOutputStream(outputFile); + proc = builder.start(); + gobbler = new RedirectGobbler(proc.getInputStream(), os); + gobblerThread = new Thread(gobbler); + gobblerThread.start(); + } catch (IOException e) { + logger.error(e.getMessage()); + return 0; + } + + boolean finished = false; + long totalTime = 180000;//3 minutes + long pulse = 100; + for (int i = 0; i < totalTime; i += pulse) { + try { + Thread.currentThread().sleep(pulse); + } catch (InterruptedException e) { + //swallow + } + try { + int exit = proc.exitValue(); + finished = true; + break; + } catch (IllegalThreadStateException e) { + //swallow + } + } + if (!finished) { + logger.warn("Had to kill process working on: " + f.getAbsolutePath()); + proc.destroy(); + } + gobbler.close(); + gobblerThread.interrupt(); + return 1; + } + + + @Override + public Integer call() throws Exception { + long start = new Date().getTime(); + + int processed = processDirectory(inputDir); + double elapsedSecs = ((double)new Date().getTime()-(double)start)/(double)1000; + logger.info("Finished processing " + processed + " files in " + elapsedSecs + " seconds."); + return processed; + } + + private class RedirectGobbler implements Runnable { + private OutputStream redirectOs = null; + private InputStream redirectIs = null; + + private RedirectGobbler(InputStream is, OutputStream os) { + this.redirectIs = is; + this.redirectOs = os; + } + + private void close() { + if (redirectOs != null) { + try { + redirectOs.flush(); + } catch (IOException e) { + logger.error("can't flush"); + } + try { + redirectIs.close(); + } catch (IOException e) { + logger.error("can't close input in redirect gobbler"); + } + try { + redirectOs.close(); + } catch (IOException e) { + logger.error("can't close output in redirect gobbler"); + } + } + } + + @Override + public void run() { + try { + IOUtils.copy(redirectIs, redirectOs); + } catch (IOException e) { + logger.error("IOException while gobbling"); + } + } + } + + public static String usage() { + StringBuilder sb = new StringBuilder(); + sb.append("Example usage:\n"); + sb.append("java -cp <CP> org.apache.batch.fs.strawman.StrawManTikaAppDriver "); + sb.append("<inputDir> <outputDir> <numThreads> "); + sb.append("java -jar tika-app-X.Xjar <...commandline arguments for tika-app>\n\n"); + return sb.toString(); + } + + public static void main(String[] args) { + long start = new Date().getTime(); + if (args.length < 6) { + System.err.println(StrawManTikaAppDriver.usage()); + } + File inputDir = new File(args[0]); + File outputDir = new File(args[1]); + int totalThreads = Integer.parseInt(args[2]); + + List<String> commandLine = new ArrayList<String>(); + commandLine.addAll(Arrays.asList(args).subList(3, args.length)); + totalThreads = (totalThreads < 1) ? 1 : totalThreads; + ExecutorService ex = Executors.newFixedThreadPool(totalThreads); + ExecutorCompletionService<Integer> completionService = + new ExecutorCompletionService<Integer>(ex); + + for (int i = 0; i < totalThreads; i++) { + StrawManTikaAppDriver driver = + new StrawManTikaAppDriver(inputDir, outputDir, totalThreads, commandLine.toArray(new String[commandLine.size()])); + completionService.submit(driver); + } + + int totalFilesProcessed = 0; + for (int i = 0; i < totalThreads; i++) { + try { + Future<Integer> future = completionService.take(); + if (future != null) { + totalFilesProcessed += future.get(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + double elapsedSeconds = (double)(new Date().getTime()-start)/(double)1000; + System.out.println("Processed "+totalFilesProcessed + " in " + elapsedSeconds + " seconds"); + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,41 @@ +package org.apache.tika.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class ClassLoaderUtil { + + @SuppressWarnings("unchecked") + public static <T> T buildClass(Class<T> iface, String className) { + + ClassLoader loader = ClassLoader.getSystemClassLoader(); + Class<?> clazz; + try { + clazz = loader.loadClass(className); + if (iface.isAssignableFrom(clazz)) { + return (T) clazz.newInstance(); + } + throw new IllegalArgumentException(iface.toString() + " is not assignable from " + className); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,66 @@ +package org.apache.tika.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Functionality and naming conventions (roughly) copied from org.apache.commons.lang3 + * so that we didn't have to add another dependency. + */ +public class DurationFormatUtils { + + public static String formatMillis(long duration) { + duration = Math.abs(duration); + StringBuilder sb = new StringBuilder(); + int secs = (int) (duration / 1000) % 60; + int mins = (int) ((duration / (1000 * 60)) % 60); + int hrs = (int) ((duration / (1000 * 60 * 60)) % 24); + int days = (int) ((duration / (1000 * 60 * 60 * 24)) % 7); + + //sb.append(millis + " milliseconds"); + addUnitString(sb, days, "day"); + addUnitString(sb, hrs, "hour"); + addUnitString(sb, mins, "minute"); + addUnitString(sb, secs, "second"); + if (duration < 1000) { + addUnitString(sb, duration, "millisecond"); + } + + return sb.toString(); + } + + private static void addUnitString(StringBuilder sb, long unit, String unitString) { + //only add unit if >= 1 + if (unit == 1) { + addComma(sb); + sb.append("1 "); + sb.append(unitString); + } else if (unit > 1) { + addComma(sb); + sb.append(unit); + sb.append(" "); + sb.append(unitString); + sb.append("s"); + } + } + + private static void addComma(StringBuilder sb) { + if (sb.length() > 0) { + sb.append(", "); + } + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,123 @@ +package org.apache.tika.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.util.Locale; + +/** + * Utility class to handle properties. If the value is null, + * or if there is a parser error, the defaultMissing value will be returned. + */ +public class PropsUtil { + + /** + * Parses v. If there is a problem, this returns defaultMissing. + * + * @param v string to parse + * @param defaultMissing value to return if value is null or unparseable + * @return parsed value + */ + public static Boolean getBoolean(String v, Boolean defaultMissing) { + if (v == null || v.length() == 0) { + return defaultMissing; + } + if (v.toLowerCase(Locale.ROOT).equals("true")) { + return true; + } + if (v.toLowerCase(Locale.ROOT).equals("false")) { + return false; + } + return defaultMissing; + } + + /** + * Parses v. If there is a problem, this returns defaultMissing. + * + * @param v string to parse + * @param defaultMissing value to return if value is null or unparseable + * @return parsed value + */ + public static Integer getInt(String v, Integer defaultMissing) { + if (v == null || v.length() == 0) { + return defaultMissing; + } + try { + return Integer.parseInt(v); + } catch (NumberFormatException e) { + //NO OP + } + return defaultMissing; + } + + /** + * Parses v. If there is a problem, this returns defaultMissing. + * + * @param v string to parse + * @param defaultMissing value to return if value is null or unparseable + * @return parsed value + */ + public static Long getLong(String v, Long defaultMissing) { + if (v == null || v.length() == 0) { + return defaultMissing; + } + try { + return Long.parseLong(v); + } catch (NumberFormatException e) { + //swallow + } + return defaultMissing; + } + + + /** + * Parses v. If there is a problem, this returns defaultMissing. + * + * @param v string to parse + * @param defaultMissing value to return if value is null or unparseable + * @return parsed value + */ + public static File getFile(String v, File defaultMissing) { + if (v == null || v.length() == 0) { + return defaultMissing; + } + //trim initial and final " if they exist + if (v.startsWith("\"")) { + v = v.substring(1); + } + if (v.endsWith("\"")) { + v = v.substring(0, v.length()-1); + } + + return new File(v); + } + + /** + * Parses v. If v is null, this returns defaultMissing. + * + * @param v string to parse + * @param defaultMissing value to return if value is null + * @return parsed value + */ + public static String getString(String v, String defaultMissing) { + if (v == null) { + return defaultMissing; + } + return v; + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,63 @@ +package org.apache.tika.util; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.PrintWriter; +import java.io.StringWriter; + +import org.apache.tika.exception.TikaException; + +/** + * Unwrap TikaExceptions and other wrappers that we might not care about + * in downstream analysis. This is similar to + * what tika-server does when returning stack traces. + */ +public class TikaExceptionFilter { + + /** + * Unwrap TikaExceptions and other wrappers that users might not + * care about in downstream analysis. + * + * @param t throwable to filter + * @return filtered throwable + */ + public Throwable filter(Throwable t) { + if (t instanceof TikaException) { + Throwable cause = t.getCause(); + if (cause != null) { + return cause; + } + } + return t; + } + + /** + * This calls {@link #filter} and then prints the filtered + * <code>Throwable</code>to a <code>String</code>. + * + * @param t throwable + * @return a filtered version of the StackTrace + */ + public String getStackTrace(Throwable t) { + Throwable filtered = filter(t); + StringWriter stringWriter = new StringWriter(); + PrintWriter w = new PrintWriter(stringWriter); + filtered.printStackTrace(w); + stringWriter.flush(); + return stringWriter.toString(); + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,109 @@ +package org.apache.tika.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashMap; +import java.util.Map; + +import org.w3c.dom.NamedNodeMap; +import org.w3c.dom.Node; + +public class XMLDOMUtil { + + /** + * This grabs the attributes from a dom node and overwrites those values with those + * specified by the overwrite map. + * + * @param node node for building + * @param overwrite map of attributes to overwrite + * @return map of attributes + */ + public static Map<String, String> mapifyAttrs(Node node, Map<String, String> overwrite) { + Map<String, String> map = new HashMap<String, String>(); + NamedNodeMap nnMap = node.getAttributes(); + for (int i = 0; i < nnMap.getLength(); i++) { + Node attr = nnMap.item(i); + map.put(attr.getNodeName(), attr.getNodeValue()); + } + if (overwrite != null) { + for (Map.Entry<String, String> e : overwrite.entrySet()) { + map.put(e.getKey(), e.getValue()); + } + } + return map; + } + + + /** + * Get an int value. Try the runtime attributes first and then back off to + * the document element. Throw a RuntimeException if the attribute is not + * found or if the value is not parseable as an int. + * + * @param attrName attribute name to find + * @param runtimeAttributes runtime attributes + * @param docElement correct element that should have specified attribute + * @return specified int value + */ + public static int getInt(String attrName, Map<String, String> runtimeAttributes, Node docElement) { + String stringValue = getStringValue(attrName, runtimeAttributes, docElement); + if (stringValue != null) { + try { + return Integer.parseInt(stringValue); + } catch (NumberFormatException e) { + //swallow + } + } + throw new RuntimeException("Need to specify a parseable int value in -- " + +attrName+" -- in commandline or in config file!"); + } + + + /** + * Get a long value. Try the runtime attributes first and then back off to + * the document element. Throw a RuntimeException if the attribute is not + * found or if the value is not parseable as a long. + * + * @param attrName attribute name to find + * @param runtimeAttributes runtime attributes + * @param docElement correct element that should have specified attribute + * @return specified long value + */ + public static long getLong(String attrName, Map<String, String> runtimeAttributes, Node docElement) { + String stringValue = getStringValue(attrName, runtimeAttributes, docElement); + if (stringValue != null) { + try { + return Long.parseLong(stringValue); + } catch (NumberFormatException e) { + //swallow + } + } + throw new RuntimeException("Need to specify a \"long\" value in -- " + +attrName+" -- in commandline or in config file!"); + } + + private static String getStringValue(String attrName, Map<String, String> runtimeAttributes, Node docElement) { + String stringValue = runtimeAttributes.get(attrName); + if (stringValue == null) { + Node staleNode = docElement.getAttributes().getNamedItem(attrName); + if (staleNode != null) { + stringValue = staleNode.getNodeValue(); + } + } + return stringValue; + } +} Added: tika/trunk/tika-batch/src/main/java/overview.html URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/overview.html?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/overview.html (added) +++ tika/trunk/tika-batch/src/main/java/overview.html Mon Mar 23 16:09:10 2015 @@ -0,0 +1,41 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!DOCTYPE html> +<html> + <head lang="en"> + <meta charset="UTF-8"> + <title>Tika Batch Module</title> + </head> + <body> + + <h1>The Batch Module for Apache Tika</h1> + + <p> + The batch module is new to Tika in 1.8. The goal is to enable robust + batch processing, with extensibility and logging. + </p> + <p> + This module currently enables file system directory to directory processing. + To build out other interfaces, follow the example of BasicTikaFSConsumer and + extend FileResourceConsumer. + </p> + <p> + <b>NOTE: This package is new and experimental and is subject to suddenly change in the next release.</b> + </p> + + </body> +</html> \ No newline at end of file Added: tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml (added) +++ tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml Mon Mar 23 16:09:10 2015 @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no" ?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<!-- NOTE: tika-batch is still an experimental feature. + The configuration file will likely change and be backward incompatible + with new versions of Tika. Please stay tuned. + --> + +<tika-batch-config + maxAliveTimeSeconds="-1" + pauseOnEarlyTerminationMillis="10000" + timeoutThresholdMillis="300000" + timeoutCheckPulseMillis="1000" + maxQueueSize="10000" + numConsumers="5"> + + <!-- options to allow on the commandline --> + <commandline> + <option opt="c" longOpt="tika-config" hasArg="true" + description="TikaConfig file"/> + <option opt="bc" longOpt="batch-config" hasArg="true" + description="xml batch config file"/> + <!-- We needed sorted for testing. We added random for performance. + Where crawling a directory is slow, it might be beneficial to + go randomly so that the parsers are triggered earlier. The + default is operating system's choice ("os") which means whatever order + the os returns files in .listFiles(). --> + <option opt="crawlOrder" hasArg="true" + description="how does the crawler sort the directories and files: + (random|sorted|os)"/> + <option opt="numConsumers" hasArg="true" + description="number of fileConsumers threads"/> + <option opt="maxFileSizeBytes" hasArg="true" + description="maximum file size to process; do not process files larger than this"/> + <option opt="maxQueueSize" hasArg="true" + description="maximum queue size for FileResources"/> + <option opt="fileList" hasArg="true" + description="file that contains a list of files (relative to inputDir) to process"/> + <option opt="fileListEncoding" hasArg="true" + description="encoding for fileList"/> + <option opt="inputDir" hasArg="true" + description="root directory for the files to be processed"/> + <option opt="startDir" hasArg="true" + description="directory (under inputDir) at which to start crawling"/> + <option opt="outputDir" hasArg="true" + description="output directory for output"/> <!-- do we want to make this mandatory --> + <option opt="recursiveParserWrapper" + description="use the RecursiveParserWrapper or not (default = false)"/> + <option opt="handleExisting" hasArg="true" + description="if an output file already exists, do you want to: overwrite, rename or skip"/> + <option opt="basicHandlerType" hasArg="true" + description="what type of content handler: xml, text, html, body"/> + <option opt="outputSuffix" hasArg="true" + description="suffix to add to the end of the output file name"/> + <option opt="timeoutThresholdMillis" hasArg="true" + description="how long to wait before determining that a consumer is stale"/> + <option opt="includeFilePat" hasArg="true" + description="regex that specifies which files to process"/> + <option opt="excludeFilePat" hasArg="true" + description="regex that specifies which files to avoid processing"/> + </commandline> + + + <!-- can specify inputDir="input", but the default config should not include this --> + <!-- can also specify startDir="input/someDir" to specify which child directory + to start processing --> + <crawler builderClass="org.apache.tika.batch.fs.builders.FSCrawlerBuilder" + crawlOrder="random" + maxFilesToAdd="-1" + maxFilesToConsider="-1" + includeFilePat="" + excludeFilePat="" + maxFileSizeBytes="-1" + /> +<!-- + This is an example of a crawler that reads a list of files to be processed from a + file. This assumes that the files in the list are relative to inputDir. + <crawler class="org.apache.tika.batch.fs.builders.FSCrawlerBuilder" + fileList="files.txt" + fileListEncoding="UTF-8" + maxFilesToAdd="-1" + maxFilesToConsider="-1" + includeFilePat="(?i).pdf$" + excludeFilePat="(?i).msg$" + maxFileSizeBytes="-1" + inputDir="input" + /> +--> + <consumers builderClass="org.apache.tika.batch.fs.builders.BasicTikaFSConsumersBuilder" + recursiveParserWrapper="false"> + <parser class="org.apache.tika.batch.AutoDetectParserFactory" parseRecursively="true"/> + <contenthandler builderClass="org.apache.tika.batch.builders.DefaultContentHandlerFactoryBuilder" + basicHandlerType="xml" writeLimit="-1"/> + + <!-- overwritePolicy: "skip" a file if output file exists, "rename" a output file, "overwrite" --> + <!-- can include e.g. outputDir="output", but we don't want to include this in the default! --> + <outputstream class="FSOutputStreamFactory" encoding="UTF-8" outputSuffix="xml"/> + </consumers> + + <!-- reporter and interrupter are optional --> + <reporter builderClass="org.apache.tika.batch.builders.SimpleLogReporterBuilder" sleepMillis="1000" + reporterStaleThresholdMillis="60000"/> + <interrupter builderClass="org.apache.tika.batch.builders.InterrupterBuilder"/> +</tika-batch-config> \ No newline at end of file Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java (added) +++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,48 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; + +import org.apache.commons.cli.Options; +import org.apache.tika.batch.builders.CommandLineParserBuilder; +import org.apache.tika.batch.fs.FSBatchTestBase; +import org.apache.tika.io.IOUtils; +import org.junit.Test; + + +public class CommandLineParserBuilderTest extends FSBatchTestBase { + + @Test + public void testBasic() throws Exception { + String configFile = this.getClass().getResource( + "/tika-batch-config-test.xml").getFile(); + InputStream is = null; + try { + is = new FileInputStream(new File(configFile)); + CommandLineParserBuilder builder = new CommandLineParserBuilder(); + Options options = builder.build(is); + //TODO: insert actual tests :) + } finally { + IOUtils.closeQuietly(is); + } + + } +} Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java (added) +++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,217 @@ +package org.apache.tika.batch.fs; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.Assert.assertEquals; +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.tika.batch.BatchProcessDriverCLI; +import org.apache.tika.io.IOUtils; +import org.junit.Test; + + +public class BatchDriverTest extends FSBatchTestBase { + + //for debugging, turn logging off/on via resources/log4j.properties for the driver + //and log4j_process.properties for the process. + + @Test(timeout = 15000) + public void oneHeavyHangTest() throws Exception { + //batch runner hits one heavy hang file, keep going + File outputDir = getNewOutputDir("daemon-"); + assertNotNull(outputDir.listFiles()); + //make sure output directory is empty! + assertEquals(0, outputDir.listFiles().length); + + String[] args = getDefaultCommandLineArgsArr("one_heavy_hang", outputDir, null); + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args); + driver.execute(); + assertEquals(0, driver.getNumRestarts()); + assertFalse(driver.getUserInterrupted()); + assertEquals(5, outputDir.listFiles().length); + assertContains("first test file", + FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"), + IOUtils.UTF_8.toString())); + + + } + + @Test(timeout = 30000) + public void restartOnFullHangTest() throws Exception { + //batch runner hits more heavy hangs than threads; needs to restart + File outputDir = getNewOutputDir("daemon-"); + + //make sure output directory is empty! + assertEquals(0, outputDir.listFiles().length); + + String[] args = getDefaultCommandLineArgsArr("heavy_heavy_hangs", outputDir, null); + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args); + driver.execute(); + //could be one or two depending on timing + assertTrue(driver.getNumRestarts() > 0); + assertFalse(driver.getUserInterrupted()); + assertContains("first test file", + FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"), + IOUtils.UTF_8.toString())); + } + + @Test(timeout = 15000) + public void noRestartTest() throws Exception { + File outputDir = getNewOutputDir("daemon-"); + + //make sure output directory is empty! + assertEquals(0, outputDir.listFiles().length); + + String[] args = getDefaultCommandLineArgsArr("no_restart", outputDir, null); + String[] mod = Arrays.copyOf(args, args.length + 2); + mod[args.length] = "-numConsumers"; + mod[args.length+1] = "1"; + + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", mod); + driver.execute(); + assertEquals(0, driver.getNumRestarts()); + assertFalse(driver.getUserInterrupted()); + File[] files = outputDir.listFiles(); + assertEquals(2, files.length); + File test2 = new File(outputDir, "test2_norestart.xml.xml"); + assertTrue("test2_norestart.xml", test2.exists()); + File test3 = new File(outputDir, "test3_ok.xml.xml"); + assertFalse("test3_ok.xml", test3.exists()); + assertEquals(0, test3.length()); + } + + @Test(timeout = 15000) + public void restartOnOOMTest() throws Exception { + //batch runner hits more heavy hangs than threads; needs to restart + File outputDir = getNewOutputDir("daemon-"); + + //make sure output directory is empty! + assertEquals(0, outputDir.listFiles().length); + + String[] args = getDefaultCommandLineArgsArr("oom", outputDir, null); + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args); + driver.execute(); + assertEquals(1, driver.getNumRestarts()); + assertFalse(driver.getUserInterrupted()); + assertContains("first test file", + FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"), + IOUtils.UTF_8.toString())); + } + + @Test(timeout = 30000) + public void allHeavyHangsTestWithStarvedCrawler() throws Exception { + //this tests that if all consumers are hung and the crawler is + //waiting to add to the queue, there isn't deadlock. The BatchProcess should + //just shutdown, and the driver should restart + File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-"); + Map<String, String> args = new HashMap<String,String>(); + args.put("-numConsumers", "2"); + args.put("-maxQueueSize", "2"); + String[] commandLine = getDefaultCommandLineArgsArr("heavy_heavy_hangs", outputDir, args); + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine); + driver.execute(); + assertEquals(3, driver.getNumRestarts()); + assertFalse(driver.getUserInterrupted()); + assertContains("first test file", + FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"), + IOUtils.UTF_8.toString())); + } + + @Test(timeout = 30000) + public void maxRestarts() throws Exception { + //tests that maxRestarts works + //if -maxRestarts is not correctly removed from the commandline, + //FSBatchProcessCLI's cli parser will throw an Unrecognized option exception + + File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-"); + Map<String, String> args = new HashMap<String,String>(); + args.put("-numConsumers", "1"); + args.put("-maxQueueSize", "10"); + args.put("-maxRestarts", "2"); + + String[] commandLine = getDefaultCommandLineArgsArr("max_restarts", outputDir, args); + + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine); + driver.execute(); + assertEquals(2, driver.getNumRestarts()); + assertFalse(driver.getUserInterrupted()); + assertEquals(3, outputDir.listFiles().length); + } + + @Test(timeout = 30000) + public void maxRestartsBadParameter() throws Exception { + //tests that maxRestarts must be followed by an Integer + File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-"); + Map<String, String> args = new HashMap<String,String>(); + args.put("-numConsumers", "1"); + args.put("-maxQueueSize", "10"); + args.put("-maxRestarts", "zebra"); + + String[] commandLine = getDefaultCommandLineArgsArr("max_restarts", outputDir, args); + boolean ex = false; + try { + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine); + driver.execute(); + } catch (IllegalArgumentException e) { + ex = true; + } + assertTrue("IllegalArgumentException should have been thrown", ex); + } + + @Test(timeout = 30000) + public void testNoRestartIfProcessFails() throws Exception { + //tests that if something goes horribly wrong with FSBatchProcessCLI + //the driver will not restart it again and again + //this calls a bad xml file which should trigger a no restart exit. + File outputDir = getNewOutputDir("nostart-norestart-"); + Map<String, String> args = new HashMap<String,String>(); + args.put("-numConsumers", "1"); + args.put("-maxQueueSize", "10"); + + String[] commandLine = getDefaultCommandLineArgsArr("basic", outputDir, args); + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-broken.xml", commandLine); + driver.execute(); + assertEquals(0, outputDir.listFiles().length); + assertEquals(0, driver.getNumRestarts()); + } + + @Test(timeout = 30000) + public void testNoRestartIfProcessFailsTake2() throws Exception { + File outputDir = getNewOutputDir("nostart-norestart-"); + Map<String, String> args = new HashMap<String,String>(); + args.put("-numConsumers", "1"); + args.put("-maxQueueSize", "10"); + args.put("-somethingOrOther", "I don't Know"); + + String[] commandLine = getDefaultCommandLineArgsArr("basic", outputDir, args); + BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine); + driver.execute(); + assertEquals(0, outputDir.listFiles().length); + assertEquals(0, driver.getNumRestarts()); + } + + +} Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java (added) +++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,343 @@ +package org.apache.tika.batch.fs; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.tika.batch.BatchProcess; +import org.apache.tika.batch.BatchProcessDriverCLI; +import org.apache.tika.io.IOUtils; +import org.junit.Test; + +public class BatchProcessTest extends FSBatchTestBase { + + @Test(timeout = 15000) + public void oneHeavyHangTest() throws Exception { + + File outputDir = getNewOutputDir("one_heavy_hang-"); + + Map<String, String> args = getDefaultArgs("one_heavy_hang", outputDir); + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + StreamStrings streamStrings = ex.execute(); + assertEquals(5, outputDir.listFiles().length); + File hvyHang = new File(outputDir, "test0_heavy_hang.xml.xml"); + assertTrue(hvyHang.exists()); + assertEquals(0, hvyHang.length()); + assertNotContained(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(), + streamStrings.getErrString()); + } + + + @Test(timeout = 15000) + public void allHeavyHangsTest() throws Exception { + //each of the three threads hits a heavy hang. The BatchProcess runs into + //all timedouts and shuts down. + File outputDir = getNewOutputDir("allHeavyHangs-"); + Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir); + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + StreamStrings streamStrings = ex.execute(); + + assertEquals(3, outputDir.listFiles().length); + for (File hvyHang : outputDir.listFiles()){ + assertTrue(hvyHang.exists()); + assertEquals("file length for "+hvyHang.getName()+" should be 0, but is: " +hvyHang.length(), + 0, hvyHang.length()); + } + assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(), + streamStrings.getErrString()); + } + + @Test(timeout = 30000) + public void allHeavyHangsTestWithCrazyNumberConsumersTest() throws Exception { + File outputDir = getNewOutputDir("allHeavyHangsCrazyNumberConsumers-"); + Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir); + args.put("numConsumers", "100"); + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + StreamStrings streamStrings = ex.execute(); + assertEquals(7, outputDir.listFiles().length); + + for (int i = 0; i < 6; i++){ + File hvyHang = new File(outputDir, "test"+i+"_heavy_hang.xml.xml"); + assertTrue(hvyHang.exists()); + assertEquals(0, hvyHang.length()); + } + assertContains("This is tika-batch's first test file", + FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"), + IOUtils.UTF_8.toString())); + + //key that the process realize that there were no more processable files + //in the queue and does not ask for a restart! + assertNotContained(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(), + streamStrings.getErrString()); + } + + @Test(timeout = 30000) + public void allHeavyHangsTestWithStarvedCrawler() throws Exception { + //this tests that if all consumers are hung and the crawler is + //waiting to add to the queue, there isn't deadlock. The batchrunner should + //shutdown and ask to be restarted. + File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-"); + Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir); + args.put("numConsumers", "2"); + args.put("maxQueueSize", "2"); + args.put("timeoutThresholdMillis", "100000000");//make sure that the batch process doesn't time out + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + StreamStrings streamStrings = ex.execute(); + assertEquals(2, outputDir.listFiles().length); + + for (int i = 0; i < 2; i++){ + File hvyHang = new File(outputDir, "test"+i+"_heavy_hang.xml.xml"); + assertTrue(hvyHang.exists()); + assertEquals(0, hvyHang.length()); + } + assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(), + streamStrings.getErrString()); + assertContains("Crawler timed out", streamStrings.getErrString()); + } + + @Test(timeout = 15000) + public void outOfMemory() throws Exception { + //the first consumer should sleep for 10 seconds + //the second should be tied up in a heavy hang + //the third one should hit the oom after processing test2_ok.xml + //no consumers should process test2-4.txt! + //i.e. the first consumer will finish in 10 seconds and + //then otherwise would be looking for more, but the oom should prevent that + File outputDir = getNewOutputDir("oom-"); + + Map<String, String> args = getDefaultArgs("oom", outputDir); + args.put("numConsumers", "3"); + args.put("timeoutThresholdMillis", "30000"); + + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + StreamStrings streamStrings = ex.execute(); + + assertEquals(4, outputDir.listFiles().length); + assertContains("This is tika-batch's first test file", + FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"), + IOUtils.UTF_8.toString())); + + assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(), + streamStrings.getErrString()); + } + + + + @Test(timeout = 15000) + public void noRestart() throws Exception { + File outputDir = getNewOutputDir("no_restart"); + + Map<String, String> args = getDefaultArgs("no_restart", outputDir); + args.put("numConsumers", "1"); + + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + + StreamStrings streamStrings = ex.execute(); + File[] files = outputDir.listFiles(); + File test2 = new File(outputDir, "test2_norestart.xml.xml"); + assertTrue("test2_norestart.xml", test2.exists()); + File test3 = new File(outputDir, "test3_ok.xml.xml"); + assertFalse("test3_ok.xml", test3.exists()); + assertEquals(0, test3.length()); + assertContains("exitStatus="+ BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE, + streamStrings.getOutString()); + assertContains("causeForTermination='MAIN_LOOP_EXCEPTION_NO_RESTART'", + streamStrings.getOutString()); + } + + /** + * This tests to make sure that BatchProcess waits the appropriate + * amount of time on an early termination before stopping. + * + * If this fails, then interruptible parsers (e.g. those with + * nio channels) will be interrupted and there will be corrupted data. + */ + @Test(timeout = 60000) + public void testWaitAfterEarlyTermination() throws Exception { + File outputDir = getNewOutputDir("wait_after_early_termination"); + + Map<String, String> args = getDefaultArgs("wait_after_early_termination", outputDir); + args.put("numConsumers", "1"); + args.put("maxAliveTimeSeconds", "5");//main process loop should stop after 5 seconds + args.put("timeoutThresholdMillis", "300000");//effectively never + args.put("pauseOnEarlyTerminationMillis", "20000");//let the parser have up to 20 seconds + + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + + StreamStrings streamStrings = ex.execute(); + File[] files = outputDir.listFiles(); + assertEquals(1, files.length); + assertContains("<p>some content</p>", + FileUtils.readFileToString(new File(outputDir, "test0_sleep.xml.xml"), + IOUtils.UTF_8.toString())); + + assertContains("exitStatus="+BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE, streamStrings.getOutString()); + assertContains("causeForTermination='BATCH_PROCESS_ALIVE_TOO_LONG'", + streamStrings.getOutString()); + } + + @Test(timeout = 60000) + public void testTimeOutAfterBeingAskedToShutdown() throws Exception { + File outputDir = getNewOutputDir("timeout_after_early_termination"); + + Map<String, String> args = getDefaultArgs("timeout_after_early_termination", outputDir); + args.put("numConsumers", "1"); + args.put("maxAliveTimeSeconds", "5");//main process loop should stop after 5 seconds + args.put("timeoutThresholdMillis", "10000"); + args.put("pauseOnEarlyTerminationMillis", "20000");//let the parser have up to 20 seconds + + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + StreamStrings streamStrings = ex.execute(); + File[] files = outputDir.listFiles(); + assertEquals(1, files.length); + assertEquals(0, files[0].length()); + assertContains("exitStatus="+BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE, streamStrings.getOutString()); + assertContains("causeForTermination='BATCH_PROCESS_ALIVE_TOO_LONG'", + streamStrings.getOutString()); + } + + @Test(timeout = 10000) + public void testRedirectionOfStreams() throws Exception { + //test redirection of system.err to system.out + File outputDir = getNewOutputDir("noisy_parsers"); + + Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir); + args.put("numConsumers", "1"); + args.put("maxAliveTimeSeconds", "20");//main process loop should stop after 5 seconds + + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args); + StreamStrings streamStrings = ex.execute(); + File[] files = outputDir.listFiles(); + assertEquals(1, files.length); + assertContains("System.out", streamStrings.getOutString()); + assertContains("System.err", streamStrings.getOutString()); + assertEquals(0, streamStrings.getErrString().length()); + + } + + @Test(timeout = 10000) + public void testConsumersManagerInitHang() throws Exception { + File outputDir = getNewOutputDir("init_hang"); + + Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir); + args.put("numConsumers", "1"); + args.put("hangOnInit", "true"); + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args, "/tika-batch-config-MockConsumersBuilder.xml"); + StreamStrings streamStrings = ex.execute(); + assertEquals(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE, ex.getExitValue()); + assertContains("causeForTermination='CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART'", streamStrings.getOutString()); + } + + @Test(timeout = 10000) + public void testConsumersManagerShutdownHang() throws Exception { + File outputDir = getNewOutputDir("shutdown_hang"); + + Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir); + args.put("numConsumers", "1"); + args.put("hangOnShutdown", "true"); + + BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args, "/tika-batch-config-MockConsumersBuilder.xml"); + StreamStrings streamStrings = ex.execute(); + assertEquals(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE, ex.getExitValue()); + assertContains("ConsumersManager did not shutdown within", streamStrings.getOutString()); + } + + private class BatchProcessTestExecutor { + private final Map<String, String> args; + private final String configPath; + private int exitValue = Integer.MIN_VALUE; + + public BatchProcessTestExecutor(Map<String, String> args) { + this(args, "/tika-batch-config-test.xml"); + } + + public BatchProcessTestExecutor(Map<String, String> args, String configPath) { + this.args = args; + this.configPath = configPath; + } + + private StreamStrings execute() { + Process p = null; + try { + ProcessBuilder b = getNewBatchRunnerProcess(configPath, args); + p = b.start(); + StringStreamGobbler errorGobbler = new StringStreamGobbler(p.getErrorStream()); + StringStreamGobbler outGobbler = new StringStreamGobbler(p.getInputStream()); + Thread errorThread = new Thread(errorGobbler); + Thread outThread = new Thread(outGobbler); + errorThread.start(); + outThread.start(); + while (true) { + try { + exitValue = p.exitValue(); + break; + } catch (IllegalThreadStateException e) { + //still going; + } + } + errorGobbler.stopGobblingAndDie(); + outGobbler.stopGobblingAndDie(); + errorThread.interrupt(); + outThread.interrupt(); + return new StreamStrings(outGobbler.toString(), errorGobbler.toString()); + } catch (IOException e) { + fail(); + } finally { + destroyProcess(p); + } + return null; + } + + private int getExitValue() { + return exitValue; + } + + } + + private class StreamStrings { + private final String outString; + private final String errString; + + private StreamStrings(String outString, String errString) { + this.outString = outString; + this.errString = errString; + } + + private String getOutString() { + return outString; + } + + private String getErrString() { + return errString; + } + + @Override + public String toString() { + return "OUT>>"+outString+"<<\n"+ + "ERR>>"+errString+"<<\n"; + } + } +}
