Revision: 19679
http://sourceforge.net/p/gate/code/19679
Author: johann_p
Date: 2016-10-14 16:40:39 +0000 (Fri, 14 Oct 2016)
Log Message:
-----------
Add support for Snappy compression/decompression and
make what we can do from the command line / gcp-direct.sh
a bit more flexible.
Modified Paths:
--------------
gcp/trunk/src/gate/cloud/batch/BatchRunner.java
gcp/trunk/src/gate/cloud/io/IOConstants.java
gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java
gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java
Added Paths:
-----------
gcp/trunk/lib/snappy-java-1.1.2.6.jar
gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java
Added: gcp/trunk/lib/snappy-java-1.1.2.6.jar
===================================================================
(Binary files differ)
Index: gcp/trunk/lib/snappy-java-1.1.2.6.jar
===================================================================
--- gcp/trunk/lib/snappy-java-1.1.2.6.jar 2016-10-14 12:03:14 UTC (rev
19678)
+++ gcp/trunk/lib/snappy-java-1.1.2.6.jar 2016-10-14 16:40:39 UTC (rev
19679)
Property changes on: gcp/trunk/lib/snappy-java-1.1.2.6.jar
___________________________________________________________________
Added: svn:mime-type
## -0,0 +1 ##
+application/octet-stream
\ No newline at end of property
Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-14 12:03:14 UTC
(rev 19678)
+++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-14 16:40:39 UTC
(rev 19679)
@@ -56,6 +56,17 @@
import org.apache.log4j.Logger;
import com.sun.jna.Platform;
+import static gate.cloud.io.IOConstants.PARAM_COMPRESSION;
+import static gate.cloud.io.IOConstants.PARAM_DOCUMENT_ROOT;
+import static gate.cloud.io.IOConstants.PARAM_ENCODING;
+import static gate.cloud.io.IOConstants.PARAM_FILE_EXTENSION;
+import static gate.cloud.io.IOConstants.PARAM_REPLACE_EXTENSION;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_NONE;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_SNAPPY;
+import gate.cloud.io.file.JSONOutputHandler;
+import static
gate.cloud.io.file.JSONOutputHandler.PARAM_ANNOTATION_TYPE_PROPERTY;
+import static gate.cloud.io.file.JSONOutputHandler.PARAM_GROUP_ENTITIES_BY;
/**
* This class is a Batch Runner, i.e. it manages the execution of a batch job,
@@ -510,14 +521,16 @@
// session files here?
options.addOption("b","batchFile",true,"Batch file (required, replaces -i,
-o, -x, -r, -I)");
options.addOption("i","inputDirectory",true,"Input directory (required,
unless -b given)");
- options.addOption("f","outputFormat",true,"Output format, optional, one of
'xml' or 'finf', default is 'finf'");
+ options.addOption("f","outputFormat",true,"Output format, optional, one of
'xml', 'finf', 'ser', 'json', default is 'finf'");
options.addOption("o","outputDirectory",true,"Output directory (not output
if missing)");
options.addOption("x","executePipeline",true,"Pipeline/application file to
execute (required, unless -b given)");
options.addOption("r","reportFile",true,"Report file (optional, default:
report.xml");
options.addOption("t","numberThreads",true,"Number of threads to use
(required)");
options.addOption("I","batchId",true,"Batch ID (optional, default: GCP");
- options.addOption("ci","compressedInput",false,"Input files are
gzip-compressed");
- options.addOption("co","compressedOutput",false,"Output files are
gzip-compressed");
+ options.addOption("ci","compressedInput",false,"Input files are
gzip-compressed (.gz)");
+ options.addOption("co","compressedOutput",false,"Output files are
gzip-compressed (.gz)");
+ options.addOption("so","snappyOutput",false,"Output files are
snappy-compressed (.snappy)");
+ options.addOption("si","snappyInput",false,"Input files are
snappy-compressed (.snappy)");
options.addOption("h","help",false,"Print this help information");
BasicParser parser = new BasicParser();
@@ -563,8 +576,9 @@
}
if(line.hasOption('f')) {
outFormat = line.getOptionValue('f');
- if(!outFormat.equals("xml") && !outFormat.equals("finf")) {
- log.error("Output format (option 'f') must be either 'xml' or
'finf'");
+ if(!outFormat.equals("xml") && !outFormat.equals("finf") &&
+ !outFormat.equals("ser") && !outFormat.equals("json")) {
+ log.error("Output format (option 'f') must be either 'json', 'ser',
xml' or 'finf'");
System.exit(1);
}
} // if we have option 'f', otherwise use the preset default
@@ -698,14 +712,16 @@
// set the input Handler
String inputHandlerClassName = "gate.cloud.io.file.FileInputHandler";
Map<String,String> configData = new HashMap<String, String>();
- configData.put(IOConstants.PARAM_DOCUMENT_ROOT,
line.getOptionValue('i'));
+ configData.put(PARAM_DOCUMENT_ROOT, line.getOptionValue('i'));
if(line.hasOption("ci")) {
- configData.put(IOConstants.PARAM_COMPRESSION,"gzip");
- } else {
- configData.put(IOConstants.PARAM_COMPRESSION,"none");
+ configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_GZIP);
+ } else if(line.hasOption("si")) {
+ configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_SNAPPY);
+ } else {
+ configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_NONE);
}
- configData.put(IOConstants.PARAM_ENCODING, "UTF-8");
- configData.put(IOConstants.PARAM_FILE_EXTENSION,"");
+ configData.put(PARAM_ENCODING, "UTF-8");
+ configData.put(PARAM_FILE_EXTENSION,"");
Class<? extends InputHandler> inputHandlerClass =
Class.forName(inputHandlerClassName, true,
Gate.getClassLoader())
.asSubclass(InputHandler.class);
@@ -719,26 +735,38 @@
List<OutputHandler> outHandlers = new ArrayList<OutputHandler>();
if(line.hasOption('o')) {
String outputHandlerClassName = null;
+ configData = new HashMap<String, String>();
+ String outExt = ".finf";
if(outFormat.equals("finf")) {
outputHandlerClassName =
"gate.cloud.io.file.FastInfosetOutputHandler";
} else if(outFormat.equals("xml")) {
+ outExt = ".xml";
outputHandlerClassName =
"gate.cloud.io.file.GATEStandOffFileOutputHandler";
- }
- configData = new HashMap<String, String>();
+ } else if(outFormat.equals("ser")) {
+ outExt = ".ser";
+ outputHandlerClassName =
"gate.cloud.io.file.SerializedObjectOutputHandler";
+ } else if(outFormat.equals("json")) {
+ outExt = ".json";
+ outputHandlerClassName = "gate.cloud.io.file.JSONOutputHandler";
+ configData.put(PARAM_GROUP_ENTITIES_BY, "set");
+ configData.put(PARAM_ANNOTATION_TYPE_PROPERTY, "annType");
+ } else {
+ // cannot get here, option contents is checked earlier...
+ }
configData.put(IOConstants.PARAM_DOCUMENT_ROOT,
line.getOptionValue('o'));
- String outExt = ".finf";
- if(outFormat.equals("xml")) {
- outExt = ".xml";
- }
+
if(line.hasOption("co")) {
- configData.put(IOConstants.PARAM_COMPRESSION,"gzip");
+ configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_GZIP);
outExt = outExt + ".gz";
+ } else if(line.hasOption("so")) {
+ configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_SNAPPY);
+ outExt = outExt + ".snappy";
} else {
- configData.put(IOConstants.PARAM_COMPRESSION,"none");
+ configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_NONE);
}
- configData.put(IOConstants.PARAM_FILE_EXTENSION,outExt);
- configData.put(IOConstants.PARAM_ENCODING, "UTF-8");
- configData.put(IOConstants.PARAM_REPLACE_EXTENSION, "true");
+ configData.put(PARAM_FILE_EXTENSION,outExt);
+ configData.put(PARAM_ENCODING, "UTF-8");
+ configData.put(PARAM_REPLACE_EXTENSION, "true");
Class<? extends OutputHandler> ouputHandlerClass =
Class.forName(outputHandlerClassName, true, Gate.getClassLoader())
.asSubclass(OutputHandler.class);
@@ -756,7 +784,7 @@
Class.forName(enumeratorClassName, true, Gate.getClassLoader())
.asSubclass(DocumentEnumerator.class);
configData = new HashMap<String, String>();
- configData.put(IOConstants.PARAM_DOCUMENT_ROOT,
line.getOptionValue('i'));
+ configData.put(PARAM_DOCUMENT_ROOT, line.getOptionValue('i'));
List<DocumentID> docIds = new LinkedList<DocumentID>();
DocumentEnumerator enumerator = enumeratorClass.newInstance();
enumerator.config(configData);
Modified: gcp/trunk/src/gate/cloud/io/IOConstants.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/IOConstants.java 2016-10-14 12:03:14 UTC
(rev 19678)
+++ gcp/trunk/src/gate/cloud/io/IOConstants.java 2016-10-14 16:40:39 UTC
(rev 19679)
@@ -95,6 +95,12 @@
public static final String VALUE_COMPRESSION_GZIP = "gzip";
/**
+ * The files are Snappy-compressed (and have the extension
".snappy"
+ * appended to the normal extension specific to their mime type).
+ */
+ public static final String VALUE_COMPRESSION_SNAPPY = "snappy";
+
+ /**
* The location of an ARC file.
*/
@Deprecated
Modified: gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java
2016-10-14 12:03:14 UTC (rev 19678)
+++ gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java
2016-10-14 16:40:39 UTC (rev 19679)
@@ -26,6 +26,7 @@
import java.util.zip.GZIPOutputStream;
import static gate.cloud.io.IOConstants.*;
+import org.xerial.snappy.SnappyOutputStream;
public abstract class AbstractFileOutputHandler extends AbstractOutputHandler {
/**
@@ -127,6 +128,8 @@
if(compression != null) {
if(compression.equals(VALUE_COMPRESSION_GZIP)) {
os = new GZIPOutputStream(os);
+ } else if(compression.equals(VALUE_COMPRESSION_SNAPPY)) {
+ os = new SnappyOutputStream(os);
}
}
return new BufferedOutputStream(os);
Modified: gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java 2016-10-14
12:03:14 UTC (rev 19678)
+++ gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java 2016-10-14
16:40:39 UTC (rev 19679)
@@ -17,11 +17,11 @@
import gate.FeatureMap;
import gate.Gate;
import gate.cloud.batch.DocumentID;
-import gate.cloud.batch.PooledDocumentProcessor;
import gate.cloud.io.DocumentData;
import gate.cloud.io.IOConstants;
import gate.cloud.io.InputHandler;
import gate.cloud.util.GZIPURLStreamHandler;
+import gate.cloud.util.SnappyURLStreamHandler;
import gate.util.GateException;
import java.io.File;
@@ -101,6 +101,8 @@
URL docUrl = docFile.toURI().toURL();
if(compression.equals(VALUE_COMPRESSION_GZIP)){
docUrl = new URL(docUrl, "", new GZIPURLStreamHandler(docUrl));
+ } else if(compression.equals(VALUE_COMPRESSION_SNAPPY)) {
+ docUrl = new URL(docUrl, "", new SnappyURLStreamHandler(docUrl));
}
params.put(Document.DOCUMENT_URL_PARAMETER_NAME, docUrl);
Copied: gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java (from rev
19673, gcp/trunk/src/gate/cloud/util/GZIPURLStreamHandler.java)
===================================================================
--- gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java
(rev 0)
+++ gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java 2016-10-14
16:40:39 UTC (rev 19679)
@@ -0,0 +1,86 @@
+/*
+ * SnappyURLStreamHandler.java
+ * Copyright (c) 2016, The University of Sheffield.
+ *
+ * This file is part of GCP (see http://gate.ac.uk/), and is free
+ * software, licenced under the GNU Affero General Public License,
+ * Version 3, November 2007.
+ *
+ */
+package gate.cloud.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import org.xerial.snappy.SnappyInputStream;
+
+/**
+ * A URL stream handler that can be used to read data compressed in Snappy
format.
+ * This implementation is not feature complete - it only has the functionality
+ * required to open GATE documents from snappy-compressed files!
+ */
+public class SnappyURLStreamHandler extends URLStreamHandler {
+
+ /**
+ * A URL connection that has the minimal implementation for uncompressing
+ * snappy content.
+ */
+ public class SnappyURLConnection extends URLConnection{
+ public SnappyURLConnection() throws IOException {
+ super(originalURL);
+ }
+
+ /**
+ * A URLConnection from the original URL.
+ */
+ protected URLConnection originalConnection;
+
+
+ /* (non-Javadoc)
+ * @see java.net.URLConnection#connect()
+ */
+ @Override
+ public void connect() throws IOException {
+ if(!connected){
+ this.originalConnection = originalURL.openConnection();
+ connected = true;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.net.URLConnection#getInputStream()
+ */
+ @Override
+ public InputStream getInputStream() throws IOException {
+ if(!connected) connect();
+ return new SnappyInputStream(originalConnection.getInputStream());
+ }
+
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.net.URLStreamHandler#openConnection(java.net.URL)
+ */
+ @Override
+ protected URLConnection openConnection(URL u) throws IOException {
+ return new SnappyURLConnection();
+ }
+
+
+ /**
+ * The URL we are wrapping.
+ */
+ protected URL originalURL;
+
+
+ public SnappyURLStreamHandler(URL wrappedUrl) {
+ super();
+ this.originalURL = wrappedUrl;
+ }
+
+
+}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs