SAMOA-47: Adding Code Review comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/76a37363 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/76a37363 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/76a37363 Branch: refs/heads/master Commit: 76a373639a18eab07cd2a8a52959ec5bb38e383f Parents: 533f12a Author: jayadeepj <[email protected]> Authored: Sat Nov 14 12:52:18 2015 +0530 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Mon Nov 30 13:20:36 2015 +0200 ---------------------------------------------------------------------- .../samoa/moa/streams/AvroFileStream.java | 251 +++++---- .../org/apache/samoa/instances/ArffLoader.java | 32 +- .../samoa/instances/AvroBinaryLoader.java | 161 +++--- .../apache/samoa/instances/AvroJsonLoader.java | 177 ++++--- .../org/apache/samoa/instances/AvroLoader.java | 504 ++++++++++--------- .../org/apache/samoa/instances/Instances.java | 55 +- .../java/org/apache/samoa/instances/Loader.java | 30 +- 7 files changed, 608 insertions(+), 602 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/76a37363/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java index 0a98acf..74b31dd 100644 --- a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java +++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java @@ -35,136 +35,133 @@ import com.github.javacliparser.FileOption; import com.github.javacliparser.IntOption; import com.github.javacliparser.StringOption; - /** - * InstanceStream implementation to handle Apache Avro Files. - * Handles both JSON & Binary encoded streams - * + * InstanceStream implementation to handle Apache Avro Files. Handles both JSON & Binary encoded streams + * * */ public class AvroFileStream extends FileStream { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class); - - public FileOption avroFileOption = new FileOption("avroFile", 'f',"Avro File(s) to load.", null, null, false); - public IntOption classIndexOption = new IntOption("classIndex", 'c',"Class index of data. 0 for none or -1 for last attribute in file.",-1, -1, Integer.MAX_VALUE); - public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY"); - - /** Represents the last read Instance **/ - protected InstanceExample lastInstanceRead; - - /** Represents the binary input stream of avro data **/ - protected transient InputStream inputStream = null; - - /** The extension to be considered for the files **/ - private static final String AVRO_FILE_EXTENSION = "avro"; - - /* (non-Javadoc) - * @see org.apache.samoa.streams.FileStream#reset() - * Reset the BINARY encoded Avro Stream & Close the file source - */ - @Override - protected void reset() { - - try { - if (this.inputStream != null) - this.inputStream.close(); - - fileSource.reset(); - } catch (IOException ioException) { - logger.error(AVRO_STREAM_FAILED_RESTART_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException); - } - - if (!getNextFileStream()) { - hitEndOfStream = true; - throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR); - } - } - - - /** - * Get next File Stream & set the class index read from the command line option - * @return - */ - protected boolean getNextFileStream() { - if (this.inputStream != null) - try { - this.inputStream.close(); - } catch (IOException ioException) { - logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); - } - - this.inputStream = this.fileSource.getNextInputStream(); - - if (this.inputStream == null) - return false; - - this.instances = new Instances(this.inputStream, classIndexOption.getValue(),encodingFormatOption.getValue()); - - if (this.classIndexOption.getValue() < 0) { - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } else if (this.classIndexOption.getValue() > 0) { - this.instances.setClassIndex(this.classIndexOption.getValue() - 1); - } - return true; - } - - - /* (non-Javadoc) - * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile() - * Read next Instance from File. Return false if unable to read next Instance - */ - @Override - protected boolean readNextInstanceFromFile() { - try { - if (this.instances.readInstance()) { - this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); - this.instances.delete(); - return true; - } - if (this.inputStream != null) { - this.inputStream.close(); - this.inputStream = null; - } - return false; - } catch (IOException ioException) { - logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); - } - - } - - @Override - public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { - super.prepareForUseImpl(monitor, repository); - String filePath = this.avroFileOption.getFile().getAbsolutePath(); - this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION); - this.lastInstanceRead = null; - } - - - /* (non-Javadoc) - * @see org.apache.samoa.streams.FileStream#getLastInstanceRead() - * Return the last read Instance - */ - @Override - protected InstanceExample getLastInstanceRead() { - return this.lastInstanceRead; - } - - - @Override - public void getDescription(StringBuilder sb, int indent) { - throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD); - } - - /** Error Messages to for all types of Avro File Streams */ - protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed."; - protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty."; - protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream."; - protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet."; - + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class); + + public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro File(s) to load.", null, null, false); + public IntOption classIndexOption = new IntOption("classIndex", 'c', + "Class index of data. 0 for none or -1 for last attribute in file.", -1, -1, Integer.MAX_VALUE); + public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', + "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY"); + + /** Represents the last read Instance **/ + protected InstanceExample lastInstanceRead; + + /** Represents the binary input stream of avro data **/ + protected transient InputStream inputStream = null; + + /** The extension to be considered for the files **/ + private static final String AVRO_FILE_EXTENSION = "avro"; + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#reset() + * Reset the BINARY encoded Avro Stream & Close the file source + */ + @Override + protected void reset() { + + try { + if (this.inputStream != null) + this.inputStream.close(); + + fileSource.reset(); + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException); + } + + if (!getNextFileStream()) { + hitEndOfStream = true; + throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR); + } + } + + /** + * Get next File Stream & set the class index read from the command line option + * + * @return + */ + protected boolean getNextFileStream() { + if (this.inputStream != null) + try { + this.inputStream.close(); + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); + } + + this.inputStream = this.fileSource.getNextInputStream(); + + if (this.inputStream == null) + return false; + + this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue()); + + if (this.classIndexOption.getValue() < 0) { + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.instances.setClassIndex(this.classIndexOption.getValue() - 1); + } + return true; + } + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile() + * Read next Instance from File. Return false if unable to read next Instance + */ + @Override + protected boolean readNextInstanceFromFile() { + try { + if (this.instances.readInstance()) { + this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); + this.instances.delete(); + return true; + } + if (this.inputStream != null) { + this.inputStream.close(); + this.inputStream = null; + } + return false; + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); + } + + } + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + super.prepareForUseImpl(monitor, repository); + String filePath = this.avroFileOption.getFile().getAbsolutePath(); + this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION); + this.lastInstanceRead = null; + } + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#getLastInstanceRead() + * Return the last read Instance + */ + @Override + protected InstanceExample getLastInstanceRead() { + return this.lastInstanceRead; + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD); + } + + /** Error Messages to for all types of Avro File Streams */ + protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed."; + protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty."; + protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream."; + protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet."; + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/76a37363/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java index 325d1b8..a25dc62 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java @@ -86,7 +86,7 @@ public class ArffLoader implements Loader { while (numAttribute == 0 && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { // For each line while (streamTokenizer.ttype != StreamTokenizer.TT_EOL - && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { // For each item if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) { // System.out.println(streamTokenizer.nval + "Num "); @@ -95,7 +95,7 @@ public class ArffLoader implements Loader { } else if (streamTokenizer.sval != null && ( streamTokenizer.ttype == StreamTokenizer.TT_WORD - || streamTokenizer.ttype == 34 || streamTokenizer.ttype == 39)) { + || streamTokenizer.ttype == 34 || streamTokenizer.ttype == 39)) { // System.out.println(streamTokenizer.sval + "Str"); boolean isNumeric = attributes.get(numAttribute).isNumeric(); double value; @@ -158,7 +158,7 @@ public class ArffLoader implements Loader { streamTokenizer.nextToken(); // Remove the '{' char // For each line while (streamTokenizer.ttype != StreamTokenizer.TT_EOL - && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { while (streamTokenizer.ttype != '}') { // For each item // streamTokenizer.nextToken(); @@ -176,7 +176,7 @@ public class ArffLoader implements Loader { if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) { // System.out.print(streamTokenizer.nval + " "); this.setSparseValue(instance, indexValues, attributeValues, numAttribute, - streamTokenizer.nval, true); + streamTokenizer.nval, true); // numAttribute++; } else if (streamTokenizer.sval != null && ( @@ -185,12 +185,12 @@ public class ArffLoader implements Loader { // System.out.print(streamTokenizer.sval + "-"); if (attributes.get(numAttribute).isNumeric()) { this.setSparseValue(instance, indexValues, attributeValues, numAttribute, - Double.valueOf(streamTokenizer.sval).doubleValue(), true); + Double.valueOf(streamTokenizer.sval).doubleValue(), true); } else { this.setSparseValue(instance, indexValues, attributeValues, numAttribute, - this.instanceInformation - .attribute(numAttribute).indexOfValue(streamTokenizer.sval), - false); + this.instanceInformation + .attribute(numAttribute).indexOfValue(streamTokenizer.sval), + false); } } streamTokenizer.nextToken(); @@ -211,14 +211,14 @@ public class ArffLoader implements Loader { arrayAttributeValues[i] = attributeValues.get(i).doubleValue(); } instance.addSparseValues(arrayIndexValues, arrayAttributeValues, - this.instanceInformation.numAttributes()); + this.instanceInformation.numAttributes()); return instance; } private void setSparseValue(Instance instance, List<Integer> indexValues, - List<Double> attributeValues, - int numAttribute, double value, boolean isNumber) { + List<Double> attributeValues, + int numAttribute, double value, boolean isNumber) { double valueAttribute; if (isNumber && this.instanceInformation.attribute(numAttribute).isNominal) { valueAttribute = @@ -246,7 +246,7 @@ public class ArffLoader implements Loader { streamTokenizer.nextToken(); // Remove the '{' char // For each line while (streamTokenizer.ttype != StreamTokenizer.TT_EOL - && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { while (streamTokenizer.ttype != '}') { // For each item // streamTokenizer.nextToken(); @@ -267,11 +267,11 @@ public class ArffLoader implements Loader { // "/"+this.instanceInformation.attribute(numAttribute).indexOfValue(streamTokenizer.sval)+" "); if (attributes.get(numAttribute).isNumeric()) { this.setValue(instance, numAttribute, - Double.valueOf(streamTokenizer.sval).doubleValue(), true); + Double.valueOf(streamTokenizer.sval).doubleValue(), true); } else { this.setValue(instance, numAttribute, - this.instanceInformation.attribute(numAttribute) - .indexOfValue(streamTokenizer.sval), false); + this.instanceInformation.attribute(numAttribute) + .indexOfValue(streamTokenizer.sval), false); // numAttribute++; } } @@ -395,6 +395,6 @@ public class ArffLoader implements Loader { @Override public Instance readInstance() { - return readInstance(this.reader); + return readInstance(this.reader); } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/76a37363/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java index ad07f62..5c57aa1 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java @@ -20,7 +20,6 @@ package org.apache.samoa.instances; * #L% */ - import java.io.IOException; import java.io.InputStream; @@ -32,88 +31,86 @@ import org.slf4j.LoggerFactory; /** * Load Data from Binary Avro Stream and parse to corresponding Dense & Parse Instances - * - * */ public class AvroBinaryLoader extends AvroLoader { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(AvroBinaryLoader.class); - - /** Avro Binary reader for an input stream **/ - protected DataFileStream<GenericRecord> dataFileStream = null; - - public AvroBinaryLoader(InputStream inputStream,int classAttribute) { - super(classAttribute); - initializeSchema(inputStream); - } - - /* (non-Javadoc) - * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream) - */ - @Override - public void initializeSchema(InputStream inputStream) - { - try { - this.datumReader = new GenericDatumReader<GenericRecord>(); - this.dataFileStream = new DataFileStream<GenericRecord>(inputStream, datumReader); - this.schema = dataFileStream.getSchema(); - - this.instanceInformation = getHeader(); - this.isSparseData = isSparseData(); - - if (classAttribute < 0) { - this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1); - } else if (classAttribute > 0) { - this.instanceInformation.setClassIndex(classAttribute - 1); - } - - } catch (IOException ioException) { - logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException); - } - } - - /* (non-Javadoc) - * @see org.apache.samoa.instances.AvroLoader#readInstance() - */ - @Override - public Instance readInstance() { - - GenericRecord record = null; - - try{ - if (dataFileStream.hasNext()) { - record =(GenericRecord) dataFileStream.next(); - } - } catch (Exception ioException) { - logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); - } - - if(record==null) - { - closeReader(); - return null; - } - - if(isSparseData) - return readInstanceSparse(record); - - return readInstanceDense(record); - } - - /** - * Close the Avro Data Stream - */ - private void closeReader() - { - if(dataFileStream !=null) - try { - dataFileStream.close(); - } catch (IOException ioException) { - logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); - } - } + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(AvroBinaryLoader.class); + + /** Avro Binary reader for an input stream **/ + protected DataFileStream<GenericRecord> dataFileStream = null; + + public AvroBinaryLoader(InputStream inputStream, int classAttribute) { + super(classAttribute); + initializeSchema(inputStream); + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream) + */ + @Override + public void initializeSchema(InputStream inputStream) + { + try { + this.datumReader = new GenericDatumReader<GenericRecord>(); + this.dataFileStream = new DataFileStream<GenericRecord>(inputStream, datumReader); + this.schema = dataFileStream.getSchema(); + + this.instanceInformation = getHeader(); + this.isSparseData = isSparseData(); + + if (classAttribute < 0) { + this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1); + } else if (classAttribute > 0) { + this.instanceInformation.setClassIndex(classAttribute - 1); + } + + } catch (IOException ioException) { + logger.error(AVRO_LOADER_SCHEMA_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR + " : " + ioException); + } + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#readInstance() + */ + @Override + public Instance readInstance() { + + GenericRecord record = null; + + try { + if (dataFileStream.hasNext()) { + record = (GenericRecord) dataFileStream.next(); + } + } catch (Exception ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR + " : " + ioException); + } + + if (record == null) + { + closeReader(); + return null; + } + + if (isSparseData) + return readInstanceSparse(record); + + return readInstanceDense(record); + } + + /** + * Close the Avro Data Stream + */ + private void closeReader() + { + if (dataFileStream != null) + try { + dataFileStream.close(); + } catch (IOException ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR + " : " + ioException); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/76a37363/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java index 8a345da..b765405 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java @@ -20,7 +20,6 @@ package org.apache.samoa.instances; * #L% */ - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -37,96 +36,94 @@ import org.slf4j.LoggerFactory; /** * Load Data from JSON Avro Stream and parse to corresponding Dense & Parse Instances - * - * */ public class AvroJsonLoader extends AvroLoader { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(AvroJsonLoader.class); - - /** The Character reader for JSON read */ - protected Reader reader = null; - - public AvroJsonLoader(InputStream inputStream, int classAttribute) { - super(classAttribute); - initializeSchema(inputStream); - } - - /* (non-Javadoc) - * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream) - */ - @Override - public void initializeSchema(InputStream inputStream) - { - String schemaString = null; - try { - this.reader = new BufferedReader(new InputStreamReader(inputStream)); - schemaString = ((BufferedReader)this.reader).readLine(); - this.schema = new Schema.Parser().parse(schemaString); - this.datumReader = new GenericDatumReader<GenericRecord>(schema); - this.instanceInformation = getHeader(); - this.isSparseData = isSparseData(); - - if (classAttribute < 0) { - this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1); - } else if (classAttribute > 0) { - this.instanceInformation.setClassIndex(classAttribute - 1); - } - - } catch (IOException ioException) { - logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException); - } - } - - /* (non-Javadoc) - * @see org.apache.samoa.instances.AvroLoader#readInstance() - */ - @Override - public Instance readInstance() { - - String line = null; - Decoder decoder = null; - GenericRecord record = null; - - try{ - while ((line = ((BufferedReader)reader).readLine()) != null) { - if(line==null || line.trim().length()<=0) - continue; - - decoder = DecoderFactory.get().jsonDecoder(schema, line); - record = datumReader.read(null, decoder); - break; - } - } catch (IOException ioException) { - logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); - } - - if(record==null) - { - closeReader(); - return null; - } - - if(isSparseData) - return readInstanceSparse(record); - - return readInstanceDense(record); - } - - /** - * Close the Avro Data Stream - */ - private void closeReader() - { - if(reader !=null) - try { - reader.close(); - } catch (IOException ioException) { - logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); - throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); - } - } + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(AvroJsonLoader.class); + + /** The Character reader for JSON read */ + protected Reader reader = null; + + public AvroJsonLoader(InputStream inputStream, int classAttribute) { + super(classAttribute); + initializeSchema(inputStream); + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream) + */ + @Override + public void initializeSchema(InputStream inputStream) + { + String schemaString = null; + try { + this.reader = new BufferedReader(new InputStreamReader(inputStream)); + schemaString = ((BufferedReader) this.reader).readLine(); + this.schema = new Schema.Parser().parse(schemaString); + this.datumReader = new GenericDatumReader<GenericRecord>(schema); + this.instanceInformation = getHeader(); + this.isSparseData = isSparseData(); + + if (classAttribute < 0) { + this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1); + } else if (classAttribute > 0) { + this.instanceInformation.setClassIndex(classAttribute - 1); + } + + } catch (IOException ioException) { + logger.error(AVRO_LOADER_SCHEMA_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR + " : " + ioException); + } + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#readInstance() + */ + @Override + public Instance readInstance() { + + String line = null; + Decoder decoder = null; + GenericRecord record = null; + + try { + while ((line = ((BufferedReader) reader).readLine()) != null) { + if (line == null || line.trim().length() <= 0) + continue; + + decoder = DecoderFactory.get().jsonDecoder(schema, line); + record = datumReader.read(null, decoder); + break; + } + } catch (IOException ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR + " : " + ioException); + } + + if (record == null) + { + closeReader(); + return null; + } + + if (isSparseData) + return readInstanceSparse(record); + + return readInstanceDense(record); + } + + /** + * Close the Avro Data Stream + */ + private void closeReader() + { + if (reader != null) + try { + reader.close(); + } catch (IOException ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR + " : " + ioException); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/76a37363/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java index 33db437..0547a5c 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java @@ -20,7 +20,6 @@ package org.apache.samoa.instances; * #L% */ - import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -32,254 +31,267 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; /** - * Load Data from Avro Stream and parse to corresponding Dense & Parse Instances - * Abstract Class: Subclass this class for different types of Avro Encodings - * + * Load Data from Avro Stream and parse to corresponding Dense & Parse Instances Abstract Class: Subclass this class for + * different types of Avro Encodings * */ public abstract class AvroLoader implements Loader { - private static final long serialVersionUID = 1L; - - /** Representation of the Avro Schema for the Instances being read. Built from the first line in the data */ - protected Schema schema = null; - - /** Meta-data of the Instance */ - protected InstanceInformation instanceInformation; - - /** List of attributes in the data as read from the schema */ - protected List<Attribute> attributes; - - /** This variable is to check if the data stored is Sparse or Dense */ - protected boolean isSparseData; - - protected int classAttribute; + private static final long serialVersionUID = 1L; - /** Datum Reader for Avro Data*/ - public DatumReader<GenericRecord> datumReader = null; - - public AvroLoader(int classAttribute) { - this.classAttribute = classAttribute; - this.isSparseData = false; - } - - /** Intialize Avro Schema, Meta Data, InstanceInformation from Input Avro Stream */ - public abstract void initializeSchema(InputStream inputStream); - - /** Read a single SAMOA Instance from Input Avro Stream */ - public abstract Instance readInstance(); - - /** - * Method to read Dense Instances from Avro File - * @return Instance - */ - protected Instance readInstanceDense(GenericRecord record) - { - Instance instance = new DenseInstance(this.instanceInformation.numAttributes() + 1); - int numAttribute = 0; - - for (Attribute attribute : attributes) { - Object value = record.get(attribute.name); - - boolean isNumeric = attributes.get(numAttribute).isNumeric(); - boolean isNominal = attributes.get(numAttribute).isNominal(); - - if(isNumeric) - { - if(value instanceof Double) - this.setDenseValue(instance, numAttribute, (double)value); - else if (value instanceof Long) - this.setDenseValue(instance, numAttribute, (long)value); - else if (value instanceof Integer) - this.setDenseValue(instance, numAttribute, (int)value); - else - throw new RuntimeException("Invalid data type in the Avro data for Numeric Type : "+attribute.name); - } - else if(isNominal) - { - double valueAttribute; - - if (!(value instanceof EnumSymbol)) - throw new RuntimeException("Invalid data type in the Avro data for Nominal Type : "+attribute.name); - - EnumSymbol enumSymbolalue = (EnumSymbol)value; - - String stringValue = enumSymbolalue.toString(); - - if (("?".equals(stringValue))||(stringValue==null)) { - valueAttribute = Double.NaN; - } else { - valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue); - } - - this.setDenseValue(instance, numAttribute, valueAttribute); - } - numAttribute++; - } - - return (numAttribute > 0) ? instance : null; - - } - - - /** - * Sets a Dense Value in the corresponding attribute index - * @param instance is the Instance where values will be set - * @param numAttribute is the index of the attribute - * @param valueAttribute is the value of the attribute for this Instance - */ - - private void setDenseValue(Instance instance, int numAttribute, double valueAttribute) { - - if (this.instanceInformation.classIndex() == numAttribute) - instance.setClassValue(valueAttribute); - else - instance.setValue(numAttribute, valueAttribute); - } - - /** - * Method to read Sparse Instances from Avro File - * @return Instance - */ - protected Instance readInstanceSparse(GenericRecord record) { - - Instance instance = new SparseInstance(1.0, null); - int numAttribute = -1; - ArrayList<Double> attributeValues = new ArrayList<Double>(); - List<Integer> indexValues = new ArrayList<Integer>(); - - for (Attribute attribute : attributes) { - numAttribute++; - Object value = record.get(attribute.name); - - boolean isNumeric = attributes.get(numAttribute).isNumeric(); - boolean isNominal = attributes.get(numAttribute).isNominal(); - - /** If value is empty/null iterate to the next attribute.**/ - if(value==null) - continue; - - if(isNumeric) - { - if(value instanceof Double) - this.setSparseValue(instance, indexValues, attributeValues, numAttribute, (double)value); - else if (value instanceof Long) - this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (long)value); - else if (value instanceof Integer) - this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (int)value); - else - throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name); - } - else if(isNominal) - { - double valueAttribute; - - if (!(value instanceof EnumSymbol)) - throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name); - - EnumSymbol enumSymbolalue = (EnumSymbol)value; - - String stringValue = enumSymbolalue.toString(); - - if (("?".equals(stringValue))||(stringValue==null)) { - valueAttribute = Double.NaN; - } else { - valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue); - } - - this.setSparseValue(instance, indexValues, attributeValues, numAttribute, valueAttribute); - } - } - - int[] arrayIndexValues = new int[attributeValues.size()]; - double[] arrayAttributeValues = new double[attributeValues.size()]; - - for (int i = 0; i < arrayIndexValues.length; i++) { - arrayIndexValues[i] = indexValues.get(i).intValue(); - arrayAttributeValues[i] = attributeValues.get(i).doubleValue(); - } - - instance.addSparseValues(arrayIndexValues, arrayAttributeValues,this.instanceInformation.numAttributes()); - return instance; - - } - - /** - * Sets a Sparse Value in the corresponding attribute index - * @param instance is the Instance where values will be set - * @param indexValues is the list of Index values - * @param attributeValues is the list of Attribute values - * @param numAttribute is the index of the attribute - * @param valueAttribute is the value of the attribute for this Instance - */ - private void setSparseValue(Instance instance, List<Integer> indexValues, List<Double> attributeValues, int numAttribute, double valueAttribute) { - - if (this.instanceInformation.classIndex() == numAttribute) { - instance.setClassValue(valueAttribute); - } else { - indexValues.add(numAttribute); - attributeValues.add(valueAttribute); - } - } - - /** - * Builds the Meta Data of from the Avro Schema - * @return - */ - protected InstanceInformation getHeader() { - - String relation = schema.getName(); - attributes = new ArrayList<Attribute>(); - - /** By Definition, the returned list is in the order of their positions. **/ - List<Schema.Field> fields = schema.getFields(); - - for (Field field : fields) { - Schema attributeSchema = field.schema(); - - /** Currently SAMOA supports only NOMINAL & Numeric Types.**/ - if(attributeSchema.getType()==Schema.Type.ENUM) - { - List<String> attributeLabels = attributeSchema.getEnumSymbols(); - attributes.add(new Attribute(field.name(), attributeLabels)); - } - else - attributes.add(new Attribute(field.name())); - } - return new InstanceInformation(relation, attributes); - } - - /** - * Identifies if the dataset is is Sparse or Dense - * @return boolean - */ - protected boolean isSparseData() - { - List<Schema.Field> fields = schema.getFields(); - for (Field field : fields) { - Schema attributeSchema = field.schema(); - - /** If even one attribute has a null union (nullable attribute) consider it as sparse data**/ - if(attributeSchema.getType()==Schema.Type.UNION) - { - List<Schema> unionTypes = attributeSchema.getTypes(); - for (Schema unionSchema : unionTypes) { - if(unionSchema.getType()==Schema.Type.NULL) - return true; - } - } - - } - return false; - } - - @Override - public InstanceInformation getStructure() { - return this.instanceInformation; - } - - /** Error Messages to for all types of Avro Loaders */ - protected static final String AVRO_LOADER_INVALID_TYPE_ERROR = "Invalid data type in the Avro data"; - protected static final String AVRO_LOADER_SCHEMA_READ_ERROR = "Exception while reading the schema from Avro File"; - protected static final String AVRO_LOADER_INSTANCE_READ_ERROR = "Exception while reading the Instance from Avro File."; + /** Representation of the Avro Schema for the Instances being read. Built from the first line in the data */ + protected Schema schema = null; + + /** Meta-data of the Instance */ + protected InstanceInformation instanceInformation; + + /** List of attributes in the data as read from the schema */ + protected List<Attribute> attributes; + + /** This variable is to check if the data stored is Sparse or Dense */ + protected boolean isSparseData; + + protected int classAttribute; + + /** Datum Reader for Avro Data */ + public DatumReader<GenericRecord> datumReader = null; + + public AvroLoader(int classAttribute) { + this.classAttribute = classAttribute; + this.isSparseData = false; + } + + /** Intialize Avro Schema, Meta Data, InstanceInformation from Input Avro Stream */ + public abstract void initializeSchema(InputStream inputStream); + + /** Read a single SAMOA Instance from Input Avro Stream */ + public abstract Instance readInstance(); + + /** + * Method to read Dense Instances from Avro File + * + * @return Instance + */ + protected Instance readInstanceDense(GenericRecord record) + { + Instance instance = new DenseInstance(this.instanceInformation.numAttributes() + 1); + int numAttribute = 0; + + for (Attribute attribute : attributes) { + Object value = record.get(attribute.name); + + boolean isNumeric = attributes.get(numAttribute).isNumeric(); + boolean isNominal = attributes.get(numAttribute).isNominal(); + + if (isNumeric) + { + if (value instanceof Double) + this.setDenseValue(instance, numAttribute, (double) value); + else if (value instanceof Long) + this.setDenseValue(instance, numAttribute, (long) value); + else if (value instanceof Integer) + this.setDenseValue(instance, numAttribute, (int) value); + else + throw new RuntimeException("Invalid data type in the Avro data for Numeric Type : " + attribute.name); + } + else if (isNominal) + { + double valueAttribute; + + if (!(value instanceof EnumSymbol)) + throw new RuntimeException("Invalid data type in the Avro data for Nominal Type : " + attribute.name); + + EnumSymbol enumSymbolalue = (EnumSymbol) value; + + String stringValue = enumSymbolalue.toString(); + + if (("?".equals(stringValue)) || (stringValue == null)) { + valueAttribute = Double.NaN; + } else { + valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue); + } + + this.setDenseValue(instance, numAttribute, valueAttribute); + } + numAttribute++; + } + + return (numAttribute > 0) ? instance : null; + + } + + /** + * Sets a Dense Value in the corresponding attribute index + * + * @param instance + * is the Instance where values will be set + * @param numAttribute + * is the index of the attribute + * @param valueAttribute + * is the value of the attribute for this Instance + */ + + private void setDenseValue(Instance instance, int numAttribute, double valueAttribute) { + + if (this.instanceInformation.classIndex() == numAttribute) + instance.setClassValue(valueAttribute); + else + instance.setValue(numAttribute, valueAttribute); + } + + /** + * Method to read Sparse Instances from Avro File + * + * @return Instance + */ + protected Instance readInstanceSparse(GenericRecord record) { + + Instance instance = new SparseInstance(1.0, null); + int numAttribute = -1; + ArrayList<Double> attributeValues = new ArrayList<Double>(); + List<Integer> indexValues = new ArrayList<Integer>(); + + for (Attribute attribute : attributes) { + numAttribute++; + Object value = record.get(attribute.name); + + boolean isNumeric = attributes.get(numAttribute).isNumeric(); + boolean isNominal = attributes.get(numAttribute).isNominal(); + + /** If value is empty/null iterate to the next attribute. **/ + if (value == null) + continue; + + if (isNumeric) + { + if (value instanceof Double) + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, (double) value); + else if (value instanceof Long) + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, (long) value); + else if (value instanceof Integer) + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, (int) value); + else + throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR + " : " + attribute.name); + } + else if (isNominal) + { + double valueAttribute; + + if (!(value instanceof EnumSymbol)) + throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR + " : " + attribute.name); + + EnumSymbol enumSymbolalue = (EnumSymbol) value; + + String stringValue = enumSymbolalue.toString(); + + if (("?".equals(stringValue)) || (stringValue == null)) { + valueAttribute = Double.NaN; + } else { + valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue); + } + + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, valueAttribute); + } + } + + int[] arrayIndexValues = new int[attributeValues.size()]; + double[] arrayAttributeValues = new double[attributeValues.size()]; + + for (int i = 0; i < arrayIndexValues.length; i++) { + arrayIndexValues[i] = indexValues.get(i).intValue(); + arrayAttributeValues[i] = attributeValues.get(i).doubleValue(); + } + + instance.addSparseValues(arrayIndexValues, arrayAttributeValues, this.instanceInformation.numAttributes()); + return instance; + + } + + /** + * Sets a Sparse Value in the corresponding attribute index + * + * @param instance + * is the Instance where values will be set + * @param indexValues + * is the list of Index values + * @param attributeValues + * is the list of Attribute values + * @param numAttribute + * is the index of the attribute + * @param valueAttribute + * is the value of the attribute for this Instance + */ + private void setSparseValue(Instance instance, List<Integer> indexValues, List<Double> attributeValues, + int numAttribute, double valueAttribute) { + + if (this.instanceInformation.classIndex() == numAttribute) { + instance.setClassValue(valueAttribute); + } else { + indexValues.add(numAttribute); + attributeValues.add(valueAttribute); + } + } + + /** + * Builds the Meta Data of from the Avro Schema + * + * @return + */ + protected InstanceInformation getHeader() { + + String relation = schema.getName(); + attributes = new ArrayList<Attribute>(); + + /** By Definition, the returned list is in the order of their positions. **/ + List<Schema.Field> fields = schema.getFields(); + + for (Field field : fields) { + Schema attributeSchema = field.schema(); + + /** Currently SAMOA supports only NOMINAL & Numeric Types. **/ + if (attributeSchema.getType() == Schema.Type.ENUM) + { + List<String> attributeLabels = attributeSchema.getEnumSymbols(); + attributes.add(new Attribute(field.name(), attributeLabels)); + } + else + attributes.add(new Attribute(field.name())); + } + return new InstanceInformation(relation, attributes); + } + + /** + * Identifies if the dataset is is Sparse or Dense + * + * @return boolean + */ + protected boolean isSparseData() + { + List<Schema.Field> fields = schema.getFields(); + for (Field field : fields) { + Schema attributeSchema = field.schema(); + + /** If even one attribute has a null union (nullable attribute) consider it as sparse data **/ + if (attributeSchema.getType() == Schema.Type.UNION) + { + List<Schema> unionTypes = attributeSchema.getTypes(); + for (Schema unionSchema : unionTypes) { + if (unionSchema.getType() == Schema.Type.NULL) + return true; + } + } + + } + return false; + } + + @Override + public InstanceInformation getStructure() { + return this.instanceInformation; + } + + /** Error Messages to for all types of Avro Loaders */ + protected static final String AVRO_LOADER_INVALID_TYPE_ERROR = "Invalid data type in the Avro data"; + protected static final String AVRO_LOADER_SCHEMA_READ_ERROR = "Exception while reading the schema from Avro File"; + protected static final String AVRO_LOADER_INSTANCE_READ_ERROR = "Exception while reading the Instance from Avro File."; } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/76a37363/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java index 707d7f2..f7fb0d3 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java @@ -46,11 +46,12 @@ public class Instances implements Serializable { * The instances. */ protected List<Instance> instances; - + transient protected Loader loader; - - protected static enum AVRO_ENCODING_FORMAT{JSON,BINARY} + protected static enum AVRO_ENCODING_FORMAT { + JSON, BINARY + } protected int classAttribute; @@ -73,24 +74,24 @@ public class Instances implements Serializable { } public Instances(Reader reader, int size, int classAttribute) { - this.classAttribute = classAttribute; - loader = new ArffLoader(reader, 0, classAttribute); - this.instanceInformation = loader.getStructure(); - this.instances = new ArrayList<>(); + this.classAttribute = classAttribute; + loader = new ArffLoader(reader, 0, classAttribute); + this.instanceInformation = loader.getStructure(); + this.instances = new ArrayList<>(); } public Instances(InputStream inputStream, int classAttribute, String encodingFormat) { - this.classAttribute = classAttribute; + this.classAttribute = classAttribute; - if(encodingFormat.equalsIgnoreCase(AVRO_ENCODING_FORMAT.BINARY.toString())) - loader = new AvroBinaryLoader(inputStream, classAttribute); - else - loader = new AvroJsonLoader(inputStream, classAttribute); + if (encodingFormat.equalsIgnoreCase(AVRO_ENCODING_FORMAT.BINARY.toString())) + loader = new AvroBinaryLoader(inputStream, classAttribute); + else + loader = new AvroJsonLoader(inputStream, classAttribute); - this.instanceInformation = loader.getStructure(); - this.instances = new ArrayList<>(); + this.instanceInformation = loader.getStructure(); + this.instances = new ArrayList<>(); } - + public Instances(Instances chunk, int capacity) { this(chunk); } @@ -191,22 +192,22 @@ public class Instances implements Serializable { public boolean readInstance(Reader fileReader) { - if (loader == null) { - loader = new ArffLoader(fileReader, 0, this.classAttribute); - } - return readInstance() ; + if (loader == null) { + loader = new ArffLoader(fileReader, 0, this.classAttribute); + } + return readInstance(); } public boolean readInstance() { - Instance inst = loader.readInstance(); - if (inst != null) { - inst.setDataset(this); - add(inst); - return true; - } else { - return false; - } + Instance inst = loader.readInstance(); + if (inst != null) { + inst.setDataset(this); + add(inst); + return true; + } else { + return false; + } } public void delete() { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/76a37363/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java index f806bf5..7e04fbb 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java @@ -20,26 +20,28 @@ package org.apache.samoa.instances; * #L% */ - import java.io.Serializable; /** * Loads Instances from streams of different types of Input Formats e.g ARFF & AVRO + * * @author jayadeepj */ -public interface Loader extends Serializable{ - - /** - * Fetch the Meta-data from the data - * @return InstanceInformation - */ - public InstanceInformation getStructure(); - - /** - * Read a single instance from the Stream - * @return Instance - */ - public Instance readInstance(); +public interface Loader extends Serializable { + + /** + * Fetch the Meta-data from the data + * + * @return InstanceInformation + */ + public InstanceInformation getStructure(); + + /** + * Read a single instance from the Stream + * + * @return Instance + */ + public Instance readInstance(); }
