Revision: 18830
http://sourceforge.net/p/gate/code/18830
Author: markagreenwood
Date: 2015-07-27 15:50:21 +0000 (Mon, 27 Jul 2015)
Log Message:
-----------
initial code drop of support for streaming documents from a CSV file into GCP
-- it should work but so far it hasn't been tested at all, that comes next
Added Paths:
-----------
gate/trunk/plugins/Format_CSV/gcp/
gate/trunk/plugins/Format_CSV/gcp/.classpath
gate/trunk/plugins/Format_CSV/gcp/.project
gate/trunk/plugins/Format_CSV/gcp/README
gate/trunk/plugins/Format_CSV/gcp/build.xml
gate/trunk/plugins/Format_CSV/gcp/src/
gate/trunk/plugins/Format_CSV/gcp/src/gate/
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java
Added: gate/trunk/plugins/Format_CSV/gcp/.classpath
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/.classpath
(rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/.classpath 2015-07-27 15:50:21 UTC
(rev 18830)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/gcp"/>
+ <classpathentry kind="lib"
path="/home/mark/gate-top/externals/gate/plugins/Format_CSV/lib/opencsv-2.3.jar"/>
+ <classpathentry kind="output" path="classes"/>
+</classpath>
Added: gate/trunk/plugins/Format_CSV/gcp/.project
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/.project (rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/.project 2015-07-27 15:50:21 UTC (rev
18830)
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>CSV4GCP</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
Added: gate/trunk/plugins/Format_CSV/gcp/README
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/README (rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/README 2015-07-27 15:50:21 UTC (rev
18830)
@@ -0,0 +1,7 @@
+The code in this folder adds support for streaming documents from a CSV file.
+files. It was compiled against version 2.6-SNAPSHOT of GCP.
+
+To recompile this code you need to specify the location of a GCP distribution
+(not an SVN checkout). For example
+
+ant -Dgcp.home=/home/mark/gcp-2.6-SNAPSHOT/
Added: gate/trunk/plugins/Format_CSV/gcp/build.xml
===================================================================
--- gate/trunk/plugins/Format_CSV/gcp/build.xml (rev 0)
+++ gate/trunk/plugins/Format_CSV/gcp/build.xml 2015-07-27 15:50:21 UTC (rev
18830)
@@ -0,0 +1,58 @@
+<project name="CSV4GCP" basedir="." default="jar">
+ <!-- Prevent Ant from warning about includeantruntime not being set -->
+ <property name="build.sysclasspath" value="ignore" />
+
+ <property file="build.properties" />
+
+ <fail unless="gcp.home">"gcp.home" property must be set before CSV
support for GCP can be compiled</fail>
+
+ <property name="gcp.lib" location="${gcp.home}/lib" />
+ <property name="src.dir" location="src" />
+ <property name="classes.dir" location="classes" />
+ <property name="jar.location" location="csv4gcp.jar" />
+ <property name="lib" location="../lib" />
+
+ <!-- Path to compile - includes gcp/lib/*.jar -->
+ <path id="compile.classpath">
+ <fileset dir="${lib}">
+ <include name="**/*.jar" />
+ </fileset>
+ <fileset dir="${gcp.lib}">
+ <include name="**/*.jar" />
+ <include name="**/*.zip" />
+ </fileset>
+ </path>
+
+ <!-- create build directory structure -->
+ <target name="prepare">
+ <mkdir dir="${classes.dir}" />
+ </target>
+
+ <target name="resources" depends="prepare">
+ <!-- <copy todir="${classes.dir}/gate/resources"
includeEmptyDirs="true">
+ <fileset dir="${src.dir}/gate/resources" />
+ </copy> -->
+ </target>
+
+ <!-- compile the source -->
+ <target name="compile" depends="prepare, resources">
+ <javac classpathref="compile.classpath" srcdir="${src.dir}"
destdir="${classes.dir}" debug="true" debuglevel="lines,source" source="1.5"
target="1.5">
+ </javac>
+ </target>
+
+ <!-- create the JAR file -->
+ <target name="jar" depends="compile">
+ <jar destfile="${jar.location}" update="false"
basedir="${classes.dir}" />
+ </target>
+
+ <!-- remove the generated .class files -->
+ <target name="clean.classes">
+ <delete dir="${classes.dir}" />
+ </target>
+
+ <!-- Clean up - remove .class and .jar files -->
+ <target name="clean" depends="clean.classes">
+ <delete file="${jar.location}" />
+ </target>
+
+</project>
Added:
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java
===================================================================
---
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java
(rev 0)
+++
gate/trunk/plugins/Format_CSV/gcp/src/gate/cloud/io/csv/CSVStreamingInputHandler.java
2015-07-27 15:50:21 UTC (rev 18830)
@@ -0,0 +1,291 @@
+/*
+ * CSVStreamingInputHandler.java
+ *
+ * Copyright (c) 2015, The University of Sheffield. See the file COPYRIGHT.txt
+ * in the software or at http://gate.ac.uk/gate/COPYRIGHT.txt
+ *
+ * This file is part of GATE (see http://gate.ac.uk/), and is free software,
+ * licenced under the GNU Library General Public License, Version 2, June 1991
+ * (in the distribution as file licence.html, and also available at
+ * http://gate.ac.uk/gate/licence.html).
+ *
+ * Mark A. Greenwood, 27/07/2015
+ */
+
+package gate.cloud.io.csv;
+
+import static gate.cloud.io.IOConstants.PARAM_BATCH_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.PARAM_ENCODING;
+import static gate.cloud.io.IOConstants.PARAM_SOURCE_FILE_LOCATION;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP;
+import gate.Document;
+import gate.Factory;
+import gate.FeatureMap;
+import gate.GateConstants;
+import gate.cloud.batch.Batch;
+import gate.cloud.batch.DocumentID;
+import gate.cloud.io.DocumentData;
+import gate.cloud.io.IOConstants;
+import gate.cloud.io.StreamingInputHandler;
+import gate.util.GateException;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+public class CSVStreamingInputHandler implements StreamingInputHandler {
+
+ public static final String PARAM_SEPARATOR_CHARACTER = "separator";
+
+ public static final String PARAM_QUOTE_CHARACTER = "quote";
+
+ public static final String PARAM_LABELLED_COLUMNS = "labelledColumns";
+
+ public static final String PARAM_COLUMN = "column";
+
+ private static Logger logger = Logger
+ .getLogger(CSVStreamingInputHandler.class);
+
+ /**
+ * Document IDs that are already complete after a previous run of this batch.
+ */
+ protected Set<String> completedDocuments;
+
+ /**
+ * Base directory of the batch.
+ */
+ protected File batchDir;
+
+ /**
+ * The source CSV file from which documents will be streamed.
+ */
+ protected File srcFile;
+
+ protected CSVReader csvReader;
+
+ protected String encoding;
+
+ protected char separatorChar;
+
+ protected char quoteChar;
+
+ protected long idCounter;
+
+ protected int column;
+
+ protected String[] features;
+
+ protected boolean colLabels;
+
+ /**
+ * Compression applied to the input file. This can be
+ * {@link IOConstants#VALUE_COMPRESSION_GZIP} in which case the file will be
+ * unpacked using Java's native GZIP support. Any other value is assumed to
be
+ * a command line to an external command that can accept an additional
+ * parameter giving the path to the file and produce the uncompressed data on
+ * its standard output, e.g. "lzop -dc" for .lzo compression.
+ */
+ protected String compression;
+
+ /**
+ * External decompression process, if applicable.
+ */
+ protected Process decompressProcess = null;
+
+ @Override
+ public void config(Map<String, String> configData) throws IOException,
+ GateException {
+
+ String srcFileStr = configData.get(PARAM_SOURCE_FILE_LOCATION);
+ if(srcFileStr == null) {
+ throw new IllegalArgumentException("Parameter " +
+ PARAM_SOURCE_FILE_LOCATION + " is required");
+ } else {
+ String batchFileStr = configData.get(PARAM_BATCH_FILE_LOCATION);
+ if(batchFileStr != null) {
+ batchDir = new File(batchFileStr).getParentFile();
+ }
+ srcFile = new File(srcFileStr);
+ if(!srcFile.isAbsolute()) {
+ srcFile = new File(batchDir, srcFileStr);
+ }
+ if(!srcFile.exists()) { throw new IllegalArgumentException("File \"" +
+ srcFile + "\", provided as value for required parameter \"" +
+ PARAM_SOURCE_FILE_LOCATION + "\", does not exist!"); }
+ if(!srcFile.isFile()) { throw new IllegalArgumentException("File \"" +
+ srcFile + "\", provided as value for required parameter \"" +
+ PARAM_SOURCE_FILE_LOCATION + "\", is not a file!"); }
+ }
+
+ encoding = configData.get(PARAM_ENCODING);
+ separatorChar = configData.get(PARAM_SEPARATOR_CHARACTER).charAt(0);
+ quoteChar = configData.get(PARAM_SEPARATOR_CHARACTER).charAt(0);
+ colLabels = Boolean.parseBoolean(configData.get(PARAM_LABELLED_COLUMNS));
+ column = Integer.parseInt(configData.get(PARAM_COLUMN));
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public void init() throws IOException, GateException {
+ InputStream inputStream = null;
+ if(compression == null) {
+ inputStream = new FileInputStream(srcFile);
+ } else if("any".equals(compression)) {
+ inputStream = new BufferedInputStream(new FileInputStream(srcFile));
+ try {
+ inputStream =
+ new CompressorStreamFactory()
+ .createCompressorInputStream(inputStream);
+ } catch(CompressorException e) {
+ if(e.getCause() != null) {
+ if(e.getCause() instanceof IOException) {
+ throw (IOException)e.getCause();
+ } else {
+ throw new GateException(e.getCause());
+ }
+ } else {
+ // unrecognised signature, assume uncompressed
+ logger
+ .info("Failed to detect compression format, assuming no
compression");
+ }
+ }
+ } else {
+ if(compression == VALUE_COMPRESSION_GZIP) {
+ compression = CompressorStreamFactory.GZIP;
+ }
+ inputStream = new BufferedInputStream(new FileInputStream(srcFile));
+ try {
+ inputStream =
+ new CompressorStreamFactory().createCompressorInputStream(
+ compression, inputStream);
+ } catch(CompressorException e) {
+ if(e.getCause() != null) {
+ if(e.getCause() instanceof IOException) {
+ throw (IOException)e.getCause();
+ } else {
+ throw new GateException(e.getCause());
+ }
+ } else {
+ // unrecognised compressor name
+ logger
+ .info("Unrecognised compression format, assuming external
compressor");
+ IOUtils.closeQuietly(inputStream);
+ // treat compression value as a command line
+ ProcessBuilder pb =
+ new ProcessBuilder(compression.trim().split("\\s+"));
+ pb.directory(batchDir);
+ pb.redirectError(Redirect.INHERIT);
+ pb.redirectOutput(Redirect.PIPE);
+ pb.redirectInput(srcFile);
+ decompressProcess = pb.start();
+ inputStream = decompressProcess.getInputStream();
+ }
+ }
+ }
+
+ csvReader =
+ new CSVReader(new InputStreamReader(inputStream, encoding),
+ separatorChar, quoteChar);
+
+ features = (colLabels ? csvReader.readNext() : null);
+
+ idCounter = 0;
+
+ }
+
+ @Override
+ public DocumentData getInputDocument(DocumentID id) throws IOException,
+ GateException {
+
+ throw new UnsupportedOperationException(
+ "CSVStreamingInputHandler can only operate in streaming mode");
+ }
+
+ @Override
+ public void startBatch(Batch b) {
+ completedDocuments = b.getCompletedDocuments();
+ if(completedDocuments != null && completedDocuments.size() > 0) {
+ logger.info("Restarting failed batch - " + completedDocuments.size() +
+ " documents already processed");
+ }
+ }
+
+ @Override
+ public DocumentData nextDocument() throws IOException, GateException {
+
+ // get the next line from the CSV file
+ String[] nextLine;
+
+ while((nextLine = csvReader.readNext()) != null) {
+
+ // skip the line if there are less columns than we need to get to the
+ // content
+ if(column >= nextLine.length) continue;
+
+ // skip the line if the column with the content is empty
+ if(nextLine[column].trim().equals("")) continue;
+
+ String id = srcFile.getName() + ":" + idCounter++;
+
+ if(completedDocuments.contains(id)) continue;
+
+ DocumentID docId = new DocumentID(id);
+
+ FeatureMap docFeatures = Factory.newFeatureMap();
+ docFeatures.put(GateConstants.THROWEX_FORMAT_PROPERTY_NAME,
Boolean.TRUE);
+
+ if(colLabels) {
+ // copy all the features from the row into a FeatureMap using the
+ // labels from the first line
+ for(int i = 0; i < features.length; ++i) {
+ if(i != column && i < nextLine.length) {
+ docFeatures.put(features[i], nextLine[i]);
+ }
+ }
+ }
+
+ FeatureMap docParams = Factory.newFeatureMap();
+ docParams.put(Document.DOCUMENT_STRING_CONTENT_PARAMETER_NAME,
+ nextLine[column]);
+
+ try {
+ Document gateDoc =
+ (Document)Factory.createResource("gate.corpora.DocumentImpl",
+ docParams, docFeatures, id);
+ return new DocumentData(gateDoc, docId);
+ } catch(Exception e) {
+ logger.warn("Error encountered while parsing object with ID " + id +
+ " - skipped", e);
+ }
+
+ }
+
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException, GateException {
+ csvReader.close();
+ if(decompressProcess != null) {
+ try {
+ decompressProcess.waitFor();
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }
+}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs