Revision: 19649 http://sourceforge.net/p/gate/code/19649 Author: johann_p Date: 2016-10-07 18:47:01 +0000 (Fri, 07 Oct 2016) Log Message: ----------- Make it so that the option -o for gcp-direct mode is not required any more. If missing, the documents will not be saved. This is useful if the pipeline contains a PR that takes care of the saving or produces other kinds of output.
Modified Paths: -------------- gcp/trunk/doc/gcp-guide.pdf gcp/trunk/doc/install-and-run.tex gcp/trunk/doc/introduction.tex gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/doc/gcp-guide.pdf =================================================================== (Binary files differ) Modified: gcp/trunk/doc/install-and-run.tex =================================================================== --- gcp/trunk/doc/install-and-run.tex 2016-10-07 15:42:09 UTC (rev 19648) +++ gcp/trunk/doc/install-and-run.tex 2016-10-07 18:47:01 UTC (rev 19649) @@ -145,8 +145,14 @@ backup and temporary file name patterns and source control metadata -- see \url{http://ant.apache.org/manual/dirtasks.html#defaultexcludes} for details). -\item[-o] the directory in which to place the output files. Each input file +\item[-o] (optional) the directory in which to place the output files. Each input file will generate an output file with the same name in the output directory. + If this option is missing, and the option \texttt{-b} is missing as well, + the documents are not saved! +\item[-b] (optional) if this option is specified it can be used to specify + a batch file. In that case, the options \texttt{-x, -i, -o, -r, -I} are + not required and ignored if specified and the corresponding information is + taken from the batch configuration file instead. \item[-r] (optional) path to the report file for this batch -- if omitted GCP will use \verb!report.xml! in the current directory. \item[-ci] the input files are all gzip-compressed Modified: gcp/trunk/doc/introduction.tex =================================================================== --- gcp/trunk/doc/introduction.tex 2016-10-07 15:42:09 UTC (rev 19648) +++ gcp/trunk/doc/introduction.tex 2016-10-07 18:47:01 UTC (rev 19649) @@ -134,6 +134,10 @@ This section summarises the main changes between releases of GCP +%% New after 2.6: +%% option -o for gcp-direct is not required any more, if missing, documents +%% are not saved. + \subsection{2.6 (June 2016)} This is a minor bugfix release, the main change is that GCP now depends on GATE Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-07 15:42:09 UTC (rev 19648) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-07 18:47:01 UTC (rev 19649) @@ -1,14 +1,14 @@ /* - * BatchRunner.java - * Copyright (c) 2007-2011, The University of Sheffield. - * - * This file is part of GCP (see http://gate.ac.uk/), and is free - * software, licenced under the GNU Affero General Public License, - * Version 3, November 2007. - * - * - * $Id$ - */ +* BatchRunner.java +* Copyright (c) 2007-2011, The University of Sheffield. +* +* This file is part of GCP (see http://gate.ac.uk/), and is free +* software, licenced under the GNU Affero General Public License, +* Version 3, November 2007. +* +* +* $Id$ +*/ package gate.cloud.batch; import gate.CorpusController; @@ -58,405 +58,405 @@ import com.sun.jna.Platform; /** - * This class is a Batch Runner, i.e. it manages the execution of a batch job, - * specified by a {@link Batch} object. - */ +* This class is a Batch Runner, i.e. it manages the execution of a batch job, +* specified by a {@link Batch} object. +*/ public class BatchRunner { - private static final Logger log = Logger.getLogger(BatchRunner.class); - //private static final long LOOP_WAIT = 5 * 60 * 1000; - private static final long LOOP_WAIT = 10 * 1000; +private static final Logger log = Logger.getLogger(BatchRunner.class); +//private static final long LOOP_WAIT = 5 * 60 * 1000; +private static final long LOOP_WAIT = 10 * 1000; - /** - * This class manages the execution of a batch job. It also exposes a - * {@link BatchJobData} interface that provides information about the - * execution progress. - */ - private class BatchHandler implements BatchJobData { - /** - * The document processor that runs the actual jobs. - */ - private DocumentProcessor processor; - - /** - * The batch being run. - */ - private Batch batch; - - private int totalDocs; - private int successDocs; - private int errorDocs; - private JobState state; - private String id; - - /** - * The moment when the execution of this batch started. - */ - private long startTime; - - /** - * The sum of {@link ProcessResult#getOriginalFileSize()} values for all - * processed documents. - */ - private long totalBytes; - - /** - * The sum of {@link ProcessResult#getDocumentLength()} values for all - * processed documents. - */ - private long totalChars; - - /** - * The results queue for this batch. - */ - private BlockingQueue<ProcessResult> resultQueue; - /** - * The report file for this batch. - */ - private File reportFile; - /** - * Writer to write the report file. - */ - private XMLStreamWriter reportWriter; - /** - * Thread that pushes jobs into the DocumentProcessor for this batch. - */ - private Thread jobPusher; +/** +* This class manages the execution of a batch job. It also exposes a +* {@link BatchJobData} interface that provides information about the +* execution progress. +*/ +private class BatchHandler implements BatchJobData { +/** +* The document processor that runs the actual jobs. +*/ +private DocumentProcessor processor; - private BatchHandler(final Batch batch) throws GateException, IOException { - successDocs = 0; - errorDocs = 0; - totalBytes = 0; - totalChars = 0; - this.batch = batch; - id = batch.getBatchId(); - } +/** +* The batch being run. +*/ +private Batch batch; - public void start() throws IOException, XMLStreamException, - ResourceInstantiationException { - reportWriter = batch.getReportWriter(); - // any existing report file has now been processed, so we know - // the correct number of unprocessed document IDs - totalDocs = batch.getUnprocessedDocumentIDs() == null ? -1 : batch.getUnprocessedDocumentIDs().length; - startTime = System.currentTimeMillis(); - setState(JobState.RUNNING); - resultQueue = new LinkedBlockingQueue<ProcessResult>(); - if(totalDocs != 0) { - final InputHandler inputHandler = batch.getInputHandler(); - processor = new PooledDocumentProcessor(executor.getCorePoolSize()); - processor.setController(batch.getGateApplication()); - processor.setExecutor(executor); - processor.setInputHandler(inputHandler); - processor.setOutputHandlers(batch.getOutputs()); - processor.setResultQueue(resultQueue); - processor.init(); - log.info("Duplication finished"); - System.gc(); - log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); - log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); - duplicationFinishedTime = System.currentTimeMillis(); - log.info("Duplication time (seconds): "+(duplicationFinishedTime-loadingFinishedTime)/1000.0); - jobPusher = new Thread(new Runnable() { - public void run() { - if(batch.getDocumentIDs() == null && inputHandler instanceof StreamingInputHandler) { - ((StreamingInputHandler)inputHandler).startBatch(batch); - processor.processStreaming(); - if(Thread.interrupted()) { return; } - } else { - for(DocumentID id : batch.getUnprocessedDocumentIDs()) { - processor.processDocument(id); - if(Thread.interrupted()) { return; } - } - } - // shut down the executor and wait for it to terminate - executor.shutdown(); - while(!executor.isTerminated()) { - try { - executor.awaitTermination(60L, TimeUnit.SECONDS); - } catch(InterruptedException e) { - // just re-interrupt ourselves and give up - Thread.currentThread().interrupt(); - } - } +private int totalDocs; +private int successDocs; +private int errorDocs; +private JobState state; +private String id; - // now we know the batch is finished - resultQueue.add(new EndOfBatchResult()); - } - }, "Batch \"" + getBatchId() + "\"-job-pusher"); - jobPusher.start(); +/** +* The moment when the execution of this batch started. +*/ +private long startTime; + +/** +* The sum of {@link ProcessResult#getOriginalFileSize()} values for all +* processed documents. +*/ +private long totalBytes; + +/** +* The sum of {@link ProcessResult#getDocumentLength()} values for all +* processed documents. +*/ +private long totalChars; + +/** +* The results queue for this batch. +*/ +private BlockingQueue<ProcessResult> resultQueue; +/** +* The report file for this batch. +*/ +private File reportFile; +/** +* Writer to write the report file. +*/ +private XMLStreamWriter reportWriter; +/** +* Thread that pushes jobs into the DocumentProcessor for this batch. +*/ +private Thread jobPusher; + +private BatchHandler(final Batch batch) throws GateException, IOException { +successDocs = 0; +errorDocs = 0; +totalBytes = 0; +totalChars = 0; +this.batch = batch; +id = batch.getBatchId(); +} + +public void start() throws IOException, XMLStreamException, +ResourceInstantiationException { +reportWriter = batch.getReportWriter(); +// any existing report file has now been processed, so we know +// the correct number of unprocessed document IDs +totalDocs = batch.getUnprocessedDocumentIDs() == null ? -1 : batch.getUnprocessedDocumentIDs().length; +startTime = System.currentTimeMillis(); +setState(JobState.RUNNING); +resultQueue = new LinkedBlockingQueue<ProcessResult>(); +if(totalDocs != 0) { +final InputHandler inputHandler = batch.getInputHandler(); +processor = new PooledDocumentProcessor(executor.getCorePoolSize()); +processor.setController(batch.getGateApplication()); +processor.setExecutor(executor); +processor.setInputHandler(inputHandler); +processor.setOutputHandlers(batch.getOutputs()); +processor.setResultQueue(resultQueue); +processor.init(); +log.info("Duplication finished"); +System.gc(); +log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); +log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); +duplicationFinishedTime = System.currentTimeMillis(); +log.info("Duplication time (seconds): "+(duplicationFinishedTime-loadingFinishedTime)/1000.0); +jobPusher = new Thread(new Runnable() { + public void run() { + if(batch.getDocumentIDs() == null && inputHandler instanceof StreamingInputHandler) { + ((StreamingInputHandler)inputHandler).startBatch(batch); + processor.processStreaming(); + if(Thread.interrupted()) { return; } + } else { + for(DocumentID id : batch.getUnprocessedDocumentIDs()) { + processor.processDocument(id); + if(Thread.interrupted()) { return; } } } - - /* - * (non-Javadoc) - * - * @see gate.sam.batch.BatchJobData#getErrorDocumentCount() - */ - public int getErrorDocumentCount() { - return errorDocs; + // shut down the executor and wait for it to terminate + executor.shutdown(); + while(!executor.isTerminated()) { + try { + executor.awaitTermination(60L, TimeUnit.SECONDS); + } catch(InterruptedException e) { + // just re-interrupt ourselves and give up + Thread.currentThread().interrupt(); + } } - /* - * (non-Javadoc) - * - * @see gate.sam.batch.BatchJobData#getProcessedDocumentCount() - */ - public int getProcessedDocumentCount() { - return errorDocs + successDocs; - } + // now we know the batch is finished + resultQueue.add(new EndOfBatchResult()); + } +}, "Batch \"" + getBatchId() + "\"-job-pusher"); +jobPusher.start(); +} +} - /* - * (non-Javadoc) - * - * @see gate.sam.batch.BatchJobData#getRemainingDocumentCount() - */ - public int getRemainingDocumentCount() { - return (totalDocs < 0) ? -1 : totalDocs - errorDocs - successDocs; - } +/* +* (non-Javadoc) +* +* @see gate.sam.batch.BatchJobData#getErrorDocumentCount() +*/ +public int getErrorDocumentCount() { +return errorDocs; +} - /* - * (non-Javadoc) - * - * @see gate.sam.batch.BatchJobData#getSuccessDocumentCount() - */ - public int getSuccessDocumentCount() { - return successDocs; - } +/* +* (non-Javadoc) +* +* @see gate.sam.batch.BatchJobData#getProcessedDocumentCount() +*/ +public int getProcessedDocumentCount() { +return errorDocs + successDocs; +} - /* - * (non-Javadoc) - * - * @see gate.sam.batch.BatchJobData#getTotalDocumentCount() - */ - public int getTotalDocumentCount() { - return totalDocs; - } +/* +* (non-Javadoc) +* +* @see gate.sam.batch.BatchJobData#getRemainingDocumentCount() +*/ +public int getRemainingDocumentCount() { +return (totalDocs < 0) ? -1 : totalDocs - errorDocs - successDocs; +} - public long getTotalDocumentLength() { - return totalChars; - } +/* +* (non-Javadoc) +* +* @see gate.sam.batch.BatchJobData#getSuccessDocumentCount() +*/ +public int getSuccessDocumentCount() { +return successDocs; +} - public long getTotalFileSize() { - return totalBytes; - } +/* +* (non-Javadoc) +* +* @see gate.sam.batch.BatchJobData#getTotalDocumentCount() +*/ +public int getTotalDocumentCount() { +return totalDocs; +} - /* - * (non-Javadoc) - * - * @see gate.sam.batch.BatchJobData#isFinished() - */ - public JobState getState() { - return state; - } +public long getTotalDocumentLength() { +return totalChars; +} - private void setState(JobState state) { - this.state = state; - } +public long getTotalFileSize() { +return totalBytes; +} - private void setErrorDocumentCount(int newCount) { - errorDocs = newCount; - } +/* +* (non-Javadoc) +* +* @see gate.sam.batch.BatchJobData#isFinished() +*/ +public JobState getState() { +return state; +} - private void setSuccessDocumentCount(int newCount) { - successDocs = newCount; - } +private void setState(JobState state) { +this.state = state; +} - /* (non-Javadoc) - * @see gate.sam.batch.BatchJobData#getStartTime() - */ - public long getStartTime() { - return startTime; - } +private void setErrorDocumentCount(int newCount) { +errorDocs = newCount; +} - /* (non-Javadoc) - * @see gate.sam.batch.BatchJobData#getBatchId() - */ - public String getBatchId() { - return id; - } - - - } - /** - * A SynchronousQueue in which offer delegates to put. ThreadPoolExecutor uses - * offer to run a new task. Using put instead means that when all the threads - * in the pool are occupied, execute will wait for one of them to become free, - * rather than failing to submit the task. - */ - public static class AlwaysBlockingSynchronousQueue - extends - SynchronousQueue<Runnable> { - /** - * Yes, I know this technically breaks the contract of BlockingQueue, but it - * works for this case. - */ - public boolean offer(Runnable task) { - try { - put(task); - } catch(InterruptedException e) { - return false; +private void setSuccessDocumentCount(int newCount) { +successDocs = newCount; +} + +/* (non-Javadoc) +* @see gate.sam.batch.BatchJobData#getStartTime() +*/ +public long getStartTime() { +return startTime; +} + +/* (non-Javadoc) +* @see gate.sam.batch.BatchJobData#getBatchId() +*/ +public String getBatchId() { +return id; +} + + +} +/** +* A SynchronousQueue in which offer delegates to put. ThreadPoolExecutor uses +* offer to run a new task. Using put instead means that when all the threads +* in the pool are occupied, execute will wait for one of them to become free, +* rather than failing to submit the task. +*/ +public static class AlwaysBlockingSynchronousQueue + extends + SynchronousQueue<Runnable> { +/** +* Yes, I know this technically breaks the contract of BlockingQueue, but it +* works for this case. +*/ +public boolean offer(Runnable task) { +try { +put(task); +} catch(InterruptedException e) { +return false; +} +return true; +} +} +/** +* A thread that runs continuously while the batch runner is active. Its role +* is to monitor the running jobs, collect process results, save the report +* files for each running batch, and shutdown the batch runner and/or Java +* process when all the batches have completed (if requested via the +* {@link BatchRunner#shutdownWhenFinished(boolean)} and +* {@link BatchRunner#exitWhenFinished(boolean)} methods). +*/ +private class JobMonitor implements Runnable { +public void run() { +boolean finished = false; +while(!finished) { +long startTime = System.currentTimeMillis(); +try { + boolean jobsStillRunning = false; + BatchHandler job = runningJob; + if(job.getState() == JobState.RUNNING) { + List<ProcessResult> results = new ArrayList<ProcessResult>(); + int resultsCount = job.resultQueue.drainTo(results); + boolean finishedBatch = false; + try { + for(ProcessResult result : results) { + if(result.getReturnCode() == ReturnCode.END_OF_BATCH) { + finishedBatch = true; + } else { + long fileSize = result.getOriginalFileSize(); + long docLength = result.getDocumentLength(); + if(fileSize > 0) job.totalBytes += fileSize; + if(docLength > 0) job.totalChars += docLength; + + job.reportWriter.writeCharacters("\n"); + Tools.writeResultToXml(result, job.reportWriter); + switch(result.getReturnCode()){ + case SUCCESS: + job.successDocs++; + break; + case FAIL: + job.errorDocs++; + break; + } + } } - return true; + job.reportWriter.flush(); + if(finishedBatch) { + job.setState(JobState.FINISHED); + //close the <documents> element + job.reportWriter.writeCharacters("\n"); + job.reportWriter.writeEndElement(); + //write the whole batch report element + Tools.writeBatchResultToXml(job, job.reportWriter); + job.reportWriter.close(); + // this will be null if no documents needed to be processed + if(job.processor != null) job.processor.dispose(); + } else { + jobsStillRunning = true; + } + } catch(XMLStreamException e) { + log.error("Can't write to report file for batch " + job.getBatchId() + + ", shutting down batch", e); + job.jobPusher.interrupt(); + job.setState(JobState.ERROR); } } - /** - * A thread that runs continuously while the batch runner is active. Its role - * is to monitor the running jobs, collect process results, save the report - * files for each running batch, and shutdown the batch runner and/or Java - * process when all the batches have completed (if requested via the - * {@link BatchRunner#shutdownWhenFinished(boolean)} and - * {@link BatchRunner#exitWhenFinished(boolean)} methods). - */ - private class JobMonitor implements Runnable { - public void run() { - boolean finished = false; - while(!finished) { - long startTime = System.currentTimeMillis(); - try { - boolean jobsStillRunning = false; - BatchHandler job = runningJob; - if(job.getState() == JobState.RUNNING) { - List<ProcessResult> results = new ArrayList<ProcessResult>(); - int resultsCount = job.resultQueue.drainTo(results); - boolean finishedBatch = false; - try { - for(ProcessResult result : results) { - if(result.getReturnCode() == ReturnCode.END_OF_BATCH) { - finishedBatch = true; - } else { - long fileSize = result.getOriginalFileSize(); - long docLength = result.getDocumentLength(); - if(fileSize > 0) job.totalBytes += fileSize; - if(docLength > 0) job.totalChars += docLength; - - job.reportWriter.writeCharacters("\n"); - Tools.writeResultToXml(result, job.reportWriter); - switch(result.getReturnCode()){ - case SUCCESS: - job.successDocs++; - break; - case FAIL: - job.errorDocs++; - break; - } - } - } - job.reportWriter.flush(); - if(finishedBatch) { - job.setState(JobState.FINISHED); - //close the <documents> element - job.reportWriter.writeCharacters("\n"); - job.reportWriter.writeEndElement(); - //write the whole batch report element - Tools.writeBatchResultToXml(job, job.reportWriter); - job.reportWriter.close(); - // this will be null if no documents needed to be processed - if(job.processor != null) job.processor.dispose(); - } else { - jobsStillRunning = true; - } - } catch(XMLStreamException e) { - log.error("Can't write to report file for batch " + job.getBatchId() - + ", shutting down batch", e); - job.jobPusher.interrupt(); - job.setState(JobState.ERROR); - } - } - // if all jobs finished and we should shutdown, then let's shutdown - if(!jobsStillRunning) { - shutdown(); - finished = true; - if(exitWhenFinished) { - System.exit(0); - } - } - long remainingSleepTime = LOOP_WAIT - - (System.currentTimeMillis() - startTime); - if(!finished && remainingSleepTime > 0) - Thread.sleep(remainingSleepTime); - } catch(InterruptedException e) { - // re-interrupt - Thread.currentThread().interrupt(); - finished = true; - } - } + // if all jobs finished and we should shutdown, then let's shutdown + if(!jobsStillRunning) { + shutdown(); + finished = true; + if(exitWhenFinished) { + System.exit(0); } } + long remainingSleepTime = LOOP_WAIT + - (System.currentTimeMillis() - startTime); + if(!finished && remainingSleepTime > 0) + Thread.sleep(remainingSleepTime); +} catch(InterruptedException e) { + // re-interrupt + Thread.currentThread().interrupt(); + finished = true; +} +} +} +} - /** - * Creates a new BatchRunner, with a given number of threads. - * - * @param numThreads - */ - public BatchRunner(int numThreads) { - // start the executors pool - // create the executor - // This is similar to an Executors.newFixedThreadPool, but instead - // of an unbounded queue for tasks, we block the caller when they - // try and execute a task and there are no threads available to run - // it. - executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, - TimeUnit.MILLISECONDS, new AlwaysBlockingSynchronousQueue()); - } +/** +* Creates a new BatchRunner, with a given number of threads. +* +* @param numThreads +*/ +public BatchRunner(int numThreads) { +// start the executors pool +// create the executor +// This is similar to an Executors.newFixedThreadPool, but instead +// of an unbounded queue for tasks, we block the caller when they +// try and execute a task and there are no threads available to run +// it. +executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, + TimeUnit.MILLISECONDS, new AlwaysBlockingSynchronousQueue()); +} - public void exitWhenFinished(boolean flag) { - synchronized(this) { - this.exitWhenFinished = flag; - } - } +public void exitWhenFinished(boolean flag) { +synchronized(this) { +this.exitWhenFinished = flag; +} +} - /** - * Stops this batch runner in an orderly fashion. - */ - public void shutdown() { - long processingFinishedTime = System.currentTimeMillis(); - log.info("Processing finished"); - System.gc(); - log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); - log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); - // if we did not need to process anything then the duplicationFinishedTime will not have - // been set and be 0. In that case, set it to the loadingFinishedTime - if(duplicationFinishedTime==0) duplicationFinishedTime = loadingFinishedTime; - log.info("Processing time (seconds): "+(processingFinishedTime-duplicationFinishedTime)/1000.0); - log.info("Total time (seconds): "+(processingFinishedTime-startTime)/1000.0); - } +/** +* Stops this batch runner in an orderly fashion. +*/ +public void shutdown() { +long processingFinishedTime = System.currentTimeMillis(); +log.info("Processing finished"); +System.gc(); +log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); +log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); +// if we did not need to process anything then the duplicationFinishedTime will not have +// been set and be 0. In that case, set it to the loadingFinishedTime +if(duplicationFinishedTime==0) duplicationFinishedTime = loadingFinishedTime; +log.info("Processing time (seconds): "+(processingFinishedTime-duplicationFinishedTime)/1000.0); +log.info("Total time (seconds): "+(processingFinishedTime-startTime)/1000.0); +} - /** - * Stores data about the currently running batch jobs. - */ - private BatchHandler runningJob; - /** - * Executor used to run the tasks. - */ - private ThreadPoolExecutor executor; - /** - * Thread to monitor jobs. - */ - private Thread monitorThread; - /** - * A flag used to signal that the batch runner should exit the Java process - * when all currently running batches have completed. - */ - private boolean exitWhenFinished = true; +/** +* Stores data about the currently running batch jobs. +*/ +private BatchHandler runningJob; +/** +* Executor used to run the tasks. +*/ +private ThreadPoolExecutor executor; +/** +* Thread to monitor jobs. +*/ +private Thread monitorThread; +/** +* A flag used to signal that the batch runner should exit the Java process +* when all currently running batches have completed. +*/ +private boolean exitWhenFinished = true; - /** - * Starts executing the batch task specified by the provided parameter. - * - * @param batch - * a {@link Batch} object describing a batch job. - * @throws IllegalArgumentException - * if there are problems with the provided batch specification (e.g. - * the new batch has the same ID as a previous batch). - * @throws IOException - * @throws GateException - * @throws XMLStreamException - */ - public void runBatch(Batch batch) throws IllegalArgumentException, - GateException, IOException, XMLStreamException { - synchronized(this) { - // record the new batch - String batchId = batch.getBatchId(); - runningJob = new BatchHandler(batch); - // register the batch with JMX +/** +* Starts executing the batch task specified by the provided parameter. +* +* @param batch +* a {@link Batch} object describing a batch job. +* @throws IllegalArgumentException +* if there are problems with the provided batch specification (e.g. +* the new batch has the same ID as a previous batch). +* @throws IOException +* @throws GateException +* @throws XMLStreamException +*/ +public void runBatch(Batch batch) throws IllegalArgumentException, + GateException, IOException, XMLStreamException { +synchronized(this) { +// record the new batch +String batchId = batch.getBatchId(); +runningJob = new BatchHandler(batch); +// register the batch with JMX try { StandardMBean batchMBean = new StandardMBean(runningJob, BatchJobData.class); Hashtable<String, String> props = new Hashtable<String, String>(); @@ -511,7 +511,7 @@ options.addOption("b","batchFile",true,"Batch file (required, replaces -i, -o, -x, -r, -I)"); options.addOption("i","inputDirectory",true,"Input directory (required, unless -b given)"); options.addOption("f","outputFormat",true,"Output format, optional, one of 'xml' or 'finf', default is 'finf'"); - options.addOption("o","outputDirectory",true,"Output directory (requried, unless -b given)"); + options.addOption("o","outputDirectory",true,"Output directory (not output if missing)"); options.addOption("x","executePipeline",true,"Pipeline/application file to execute (required, unless -b given)"); options.addOption("r","reportFile",true,"Report file (optional, default: report.xml"); options.addOption("t","numberThreads",true,"Number of threads to use (required)"); @@ -552,7 +552,7 @@ System.exit(0); } if(!line.hasOption('t') || - (!line.hasOption('b') && (!line.hasOption('i') || !line.hasOption('o') || !line.hasOption('x')))) { + (!line.hasOption('b') && (!line.hasOption('i') || !line.hasOption('x')))) { log.error("Required argument missing!"); HelpFormatter helpFormatter = new HelpFormatter(); helpFormatter.printHelp("gcp-direct.sh [options]", options); @@ -714,39 +714,42 @@ inputHandler.init(); // log.info("Have input handler: "+inputHandler); aBatch.setInputHandler(inputHandler); - // set the output Handler - String outputHandlerClassName = null; - if(outFormat.equals("finf")) { - outputHandlerClassName = "gate.cloud.io.file.FastInfosetOutputHandler"; - } else if(outFormat.equals("xml")) { - outputHandlerClassName = "gate.cloud.io.file.GATEStandOffFileOutputHandler"; - } - configData = new HashMap<String, String>(); - configData.put(IOConstants.PARAM_DOCUMENT_ROOT, line.getOptionValue('o')); - String outExt = ".finf"; - if(outFormat.equals("xml")) { - outExt = ".xml"; - } - if(line.hasOption("co")) { - configData.put(IOConstants.PARAM_COMPRESSION,"gzip"); - outExt = outExt + ".gz"; - } else { - configData.put(IOConstants.PARAM_COMPRESSION,"none"); - } - configData.put(IOConstants.PARAM_FILE_EXTENSION,outExt); - configData.put(IOConstants.PARAM_ENCODING, "UTF-8"); - configData.put(IOConstants.PARAM_REPLACE_EXTENSION, "true"); - Class<? extends OutputHandler> ouputHandlerClass = - Class.forName(outputHandlerClassName, true, Gate.getClassLoader()) + // set the output Handler, but only if the option -o has been specified, + // otherwise, do not set an output handler (run for side-effects only) + List<OutputHandler> outHandlers = new ArrayList<OutputHandler>(); + if(line.hasOption('o')) { + String outputHandlerClassName = null; + if(outFormat.equals("finf")) { + outputHandlerClassName = "gate.cloud.io.file.FastInfosetOutputHandler"; + } else if(outFormat.equals("xml")) { + outputHandlerClassName = "gate.cloud.io.file.GATEStandOffFileOutputHandler"; + } + configData = new HashMap<String, String>(); + configData.put(IOConstants.PARAM_DOCUMENT_ROOT, line.getOptionValue('o')); + String outExt = ".finf"; + if(outFormat.equals("xml")) { + outExt = ".xml"; + } + if(line.hasOption("co")) { + configData.put(IOConstants.PARAM_COMPRESSION,"gzip"); + outExt = outExt + ".gz"; + } else { + configData.put(IOConstants.PARAM_COMPRESSION,"none"); + } + configData.put(IOConstants.PARAM_FILE_EXTENSION,outExt); + configData.put(IOConstants.PARAM_ENCODING, "UTF-8"); + configData.put(IOConstants.PARAM_REPLACE_EXTENSION, "true"); + Class<? extends OutputHandler> ouputHandlerClass = + Class.forName(outputHandlerClassName, true, Gate.getClassLoader()) .asSubclass(OutputHandler.class); - OutputHandler outHandler = ouputHandlerClass.newInstance(); - outHandler.config(configData); - List<AnnotationSetDefinition> asDefs = new ArrayList<AnnotationSetDefinition>(); - outHandler.setAnnSetDefinitions(asDefs); - outHandler.init(); - // log.info("Have output handler: "+outHandler); - List<OutputHandler> outHandlers = new ArrayList<OutputHandler>(); - outHandlers.add(outHandler); + OutputHandler outHandler = ouputHandlerClass.newInstance(); + outHandler.config(configData); + List<AnnotationSetDefinition> asDefs = new ArrayList<AnnotationSetDefinition>(); + outHandler.setAnnSetDefinitions(asDefs); + outHandler.init(); + // log.info("Have output handler: "+outHandler); + outHandlers.add(outHandler); + } // if option -o is given aBatch.setOutputHandlers(outHandlers); String enumeratorClassName = "gate.cloud.io.file.FileDocumentEnumerator"; Class<? extends DocumentEnumerator> enumeratorClass = This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, SlashDot.org! http://sdm.link/slashdot _______________________________________________ GATE-cvs mailing list GATE-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/gate-cvs