SAMOA-47: Integrate Avro Streams with SAMOA
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/74979782 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/74979782 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/74979782 Branch: refs/heads/master Commit: 74979782fabfa04b7701ae4c83b91bc38402c351 Parents: b84e8ac Author: jayadeepj <[email protected]> Authored: Fri Oct 30 14:57:06 2015 +0530 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Mon Nov 30 13:20:35 2015 +0200 ---------------------------------------------------------------------- pom.xml | 1 + .../samoa/moa/streams/AvroFileStream.java | 171 +++++++++++ samoa-instances/pom.xml | 7 + .../org/apache/samoa/instances/ArffLoader.java | 8 +- .../samoa/instances/AvroBinaryLoader.java | 120 ++++++++ .../apache/samoa/instances/AvroJsonLoader.java | 133 +++++++++ .../org/apache/samoa/instances/AvroLoader.java | 286 +++++++++++++++++++ .../org/apache/samoa/instances/Instances.java | 54 ++-- .../java/org/apache/samoa/instances/Loader.java | 45 +++ 9 files changed, 806 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 41fc5bd..71b131f 100644 --- a/pom.xml +++ b/pom.xml @@ -144,6 +144,7 @@ <storm.version>0.9.4</storm.version> <!-- storm 0.8.2 loads zookeeper classes with hardcoded names from 3.3 version--> <zookeeper.storm.version>3.4.6</zookeeper.storm.version> + <avro.version>1.7.7</avro.version> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java new file mode 100644 index 0000000..e684687 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java @@ -0,0 +1,171 @@ +package org.apache.samoa.moa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.samoa.instances.Instances; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.moa.core.ObjectRepository; +import org.apache.samoa.moa.tasks.TaskMonitor; +import org.apache.samoa.streams.FileStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.FileOption; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + + +/** + * InstanceStream implementation to handle Apache Avro Files. + * Handles both JSON & Binary encoded streams + * + * @author jayadeepj + * + */ +public class AvroFileStream extends FileStream { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class); + + public FileOption avroFileOption = new FileOption("avroFile", 'f',"Avro File(s) to load.", null, null, false); + public IntOption classIndexOption = new IntOption("classIndex", 'c',"Class index of data. 0 for none or -1 for last attribute in file.",-1, -1, Integer.MAX_VALUE); + public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY"); + + /** Represents the last read Instance **/ + protected InstanceExample lastInstanceRead; + + /** Represents the binary input stream of avro data **/ + protected transient InputStream inputStream = null; + + /** The extension to be considered for the files **/ + private static final String AVRO_FILE_EXTENSION = "avro"; + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#reset() + * Reset the BINARY encoded Avro Stream & Close the file source + */ + @Override + protected void reset() { + + try { + if (this.inputStream != null) + this.inputStream.close(); + + fileSource.reset(); + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_RESTART_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException); + } + + if (!getNextFileStream()) { + hitEndOfStream = true; + throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR); + } + } + + + /** + * Get next File Stream & set the class index read from the command line option + * @return + */ + protected boolean getNextFileStream() { + if (this.inputStream != null) + try { + this.inputStream.close(); + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); + } + + this.inputStream = this.fileSource.getNextInputStream(); + + if (this.inputStream == null) + return false; + + this.instances = new Instances(this.inputStream, classIndexOption.getValue(),encodingFormatOption.getValue()); + + if (this.classIndexOption.getValue() < 0) { + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.instances.setClassIndex(this.classIndexOption.getValue() - 1); + } + return true; + } + + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile() + * Read next Instance from File. Return false if unable to read next Instance + */ + @Override + protected boolean readNextInstanceFromFile() { + try { + if (this.instances.readInstance()) { + this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); + this.instances.delete(); + return true; + } + if (this.inputStream != null) { + this.inputStream.close(); + this.inputStream = null; + } + return false; + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); + } + + } + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + super.prepareForUseImpl(monitor, repository); + String filePath = this.avroFileOption.getFile().getAbsolutePath(); + this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION); + this.lastInstanceRead = null; + } + + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#getLastInstanceRead() + * Return the last read Instance + */ + @Override + protected InstanceExample getLastInstanceRead() { + return this.lastInstanceRead; + } + + + @Override + public void getDescription(StringBuilder sb, int indent) { + throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD); + } + + /** Error Messages to for all types of Avro File Streams */ + protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed."; + protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty."; + protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream."; + protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet."; + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-instances/pom.xml b/samoa-instances/pom.xml index ed24597..64ffcd8 100644 --- a/samoa-instances/pom.xml +++ b/samoa-instances/pom.xml @@ -34,4 +34,11 @@ <artifactId>samoa</artifactId> <version>0.4.0-incubating-SNAPSHOT</version> </parent> + <dependencies> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java index 3d314f0..325d1b8 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java @@ -23,7 +23,6 @@ package org.apache.samoa.instances; import java.io.BufferedReader; import java.io.IOException; import java.io.Reader; -import java.io.Serializable; import java.io.StreamTokenizer; import java.util.ArrayList; import java.util.List; @@ -33,7 +32,7 @@ import java.util.logging.Logger; /** * @author abifet */ -public class ArffLoader implements Serializable { +public class ArffLoader implements Loader { protected InstanceInformation instanceInformation; @@ -393,4 +392,9 @@ public class ArffLoader implements Serializable { this.instanceInformation.setClassIndex(classAttribute - 1); } } + + @Override + public Instance readInstance() { + return readInstance(this.reader); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java new file mode 100644 index 0000000..c3e32dc --- /dev/null +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java @@ -0,0 +1,120 @@ +package org.apache.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Load Data from Binary Avro Stream and parse to corresponding Dense & Parse Instances + * + * @author jayadeepj + * + */ +public class AvroBinaryLoader extends AvroLoader { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(AvroBinaryLoader.class); + + /** Avro Binary reader for an input stream **/ + protected DataFileStream<GenericRecord> dataFileStream = null; + + public AvroBinaryLoader(InputStream inputStream,int classAttribute) { + super(classAttribute); + initializeSchema(inputStream); + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream) + */ + @Override + public void initializeSchema(InputStream inputStream) + { + try { + this.datumReader = new GenericDatumReader<GenericRecord>(); + this.dataFileStream = new DataFileStream<GenericRecord>(inputStream, datumReader); + this.schema = dataFileStream.getSchema(); + + this.instanceInformation = getHeader(); + this.isSparseData = isSparseData(); + + if (classAttribute < 0) { + this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1); + } else if (classAttribute > 0) { + this.instanceInformation.setClassIndex(classAttribute - 1); + } + + } catch (IOException ioException) { + logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException); + } + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#readInstance() + */ + @Override + public Instance readInstance() { + + GenericRecord record = null; + + try{ + if (dataFileStream.hasNext()) { + record =(GenericRecord) dataFileStream.next(); + } + } catch (Exception ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); + } + + if(record==null) + { + //closeReader(); + return null; + } + + if(isSparseData) + return readInstanceSparse(record); + + return readInstanceDense(record); + } + + /** + * Close the Avro Data Stream + */ + private void closeReader() + { + if(dataFileStream !=null) + try { + dataFileStream.close(); + } catch (IOException ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java new file mode 100644 index 0000000..827b507 --- /dev/null +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java @@ -0,0 +1,133 @@ +package org.apache.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Load Data from JSON Avro Stream and parse to corresponding Dense & Parse Instances + * + * @author jayadeepj + * + */ +public class AvroJsonLoader extends AvroLoader { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(AvroJsonLoader.class); + + /** The Character reader for JSON read */ + protected Reader reader = null; + + public AvroJsonLoader(InputStream inputStream, int classAttribute) { + super(classAttribute); + initializeSchema(inputStream); + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream) + */ + @Override + public void initializeSchema(InputStream inputStream) + { + String schemaString = null; + try { + this.reader = new BufferedReader(new InputStreamReader(inputStream)); + schemaString = ((BufferedReader)this.reader).readLine(); + this.schema = new Schema.Parser().parse(schemaString); + this.datumReader = new GenericDatumReader<GenericRecord>(schema); + this.instanceInformation = getHeader(); + this.isSparseData = isSparseData(); + + if (classAttribute < 0) { + this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1); + } else if (classAttribute > 0) { + this.instanceInformation.setClassIndex(classAttribute - 1); + } + + } catch (IOException ioException) { + logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException); + } + } + + /* (non-Javadoc) + * @see org.apache.samoa.instances.AvroLoader#readInstance() + */ + @Override + public Instance readInstance() { + + String line = null; + Decoder decoder = null; + GenericRecord record = null; + + try{ + while ((line = ((BufferedReader)reader).readLine()) != null) { + if(line==null || line.trim().length()<=0) + continue; + + decoder = DecoderFactory.get().jsonDecoder(schema, line); + record = datumReader.read(null, decoder); + break; + } + } catch (IOException ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); + } + + if(record==null) + { + closeReader(); + return null; + } + + if(isSparseData) + return readInstanceSparse(record); + + return readInstanceDense(record); + } + + /** + * Close the Avro Data Stream + */ + private void closeReader() + { + if(reader !=null) + try { + reader.close(); + } catch (IOException ioException) { + logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException); + throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java new file mode 100644 index 0000000..09f410f --- /dev/null +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java @@ -0,0 +1,286 @@ +package org.apache.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData.EnumSymbol; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; + +/** + * Load Data from Avro Stream and parse to corresponding Dense & Parse Instances + * Abstract Class: Subclass this class for different types of Avro Encodings + * + * @author jayadeepj + * + */ +public abstract class AvroLoader implements Loader { + + private static final long serialVersionUID = 1L; + + /** Representation of the Avro Schema for the Instances being read. Built from the first line in the data */ + protected Schema schema = null; + + /** Meta-data of the Instance */ + protected InstanceInformation instanceInformation; + + /** List of attributes in the data as read from the schema */ + protected List<Attribute> attributes; + + /** This variable is to check if the data stored is Sparse or Dense */ + protected boolean isSparseData; + + protected int classAttribute; + + /** Datum Reader for Avro Data*/ + public DatumReader<GenericRecord> datumReader = null; + + public AvroLoader(int classAttribute) { + this.classAttribute = classAttribute; + this.isSparseData = false; + } + + /** Intialize Avro Schema, Meta Data, InstanceInformation from Input Avro Stream */ + public abstract void initializeSchema(InputStream inputStream); + + /** Read a single SAMOA Instance from Input Avro Stream */ + public abstract Instance readInstance(); + + /** + * Method to read Dense Instances from Avro File + * @return Instance + */ + protected Instance readInstanceDense(GenericRecord record) + { + Instance instance = new DenseInstance(this.instanceInformation.numAttributes() + 1); + int numAttribute = 0; + + for (Attribute attribute : attributes) { + Object value = record.get(attribute.name); + + boolean isNumeric = attributes.get(numAttribute).isNumeric(); + boolean isNominal = attributes.get(numAttribute).isNominal(); + + if(isNumeric) + { + if(value instanceof Double) + this.setDenseValue(instance, numAttribute, (double)value); + else if (value instanceof Long) + this.setDenseValue(instance, numAttribute, (long)value); + else if (value instanceof Integer) + this.setDenseValue(instance, numAttribute, (int)value); + else + throw new RuntimeException("Invalid data type in the Avro data for Numeric Type : "+attribute.name); + } + else if(isNominal) + { + double valueAttribute; + + if (!(value instanceof EnumSymbol)) + throw new RuntimeException("Invalid data type in the Avro data for Nominal Type : "+attribute.name); + + EnumSymbol enumSymbolalue = (EnumSymbol)value; + + String stringValue = enumSymbolalue.toString(); + + if (("?".equals(stringValue))||(stringValue==null)) { + valueAttribute = Double.NaN; + } else { + valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue); + } + + this.setDenseValue(instance, numAttribute, valueAttribute); + } + numAttribute++; + } + + return (numAttribute > 0) ? instance : null; + + } + + + /** + * Sets a Dense Value in the corresponding attribute index + * @param instance is the Instance where values will be set + * @param numAttribute is the index of the attribute + * @param valueAttribute is the value of the attribute for this Instance + */ + + private void setDenseValue(Instance instance, int numAttribute, double valueAttribute) { + + if (this.instanceInformation.classIndex() == numAttribute) + instance.setClassValue(valueAttribute); + else + instance.setValue(numAttribute, valueAttribute); + } + + /** + * Method to read Sparse Instances from Avro File + * @return Instance + */ + protected Instance readInstanceSparse(GenericRecord record) { + + Instance instance = new SparseInstance(1.0, null); + int numAttribute = -1; + ArrayList<Double> attributeValues = new ArrayList<Double>(); + List<Integer> indexValues = new ArrayList<Integer>(); + + for (Attribute attribute : attributes) { + numAttribute++; + Object value = record.get(attribute.name); + + boolean isNumeric = attributes.get(numAttribute).isNumeric(); + boolean isNominal = attributes.get(numAttribute).isNominal(); + + /** If value is empty/null iterate to the next attribute.**/ + if(value==null) + continue; + + if(isNumeric) + { + if(value instanceof Double) + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, (double)value); + else if (value instanceof Long) + this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (long)value); + else if (value instanceof Integer) + this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (int)value); + else + throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name); + } + else if(isNominal) + { + double valueAttribute; + + if (!(value instanceof EnumSymbol)) + throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name); + + EnumSymbol enumSymbolalue = (EnumSymbol)value; + + String stringValue = enumSymbolalue.toString(); + + if (("?".equals(stringValue))||(stringValue==null)) { + valueAttribute = Double.NaN; + } else { + valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue); + } + + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, valueAttribute); + } + } + + int[] arrayIndexValues = new int[attributeValues.size()]; + double[] arrayAttributeValues = new double[attributeValues.size()]; + + for (int i = 0; i < arrayIndexValues.length; i++) { + arrayIndexValues[i] = indexValues.get(i).intValue(); + arrayAttributeValues[i] = attributeValues.get(i).doubleValue(); + } + + instance.addSparseValues(arrayIndexValues, arrayAttributeValues,this.instanceInformation.numAttributes()); + return instance; + + } + + /** + * Sets a Sparse Value in the corresponding attribute index + * @param instance is the Instance where values will be set + * @param indexValues is the list of Index values + * @param attributeValues is the list of Attribute values + * @param numAttribute is the index of the attribute + * @param valueAttribute is the value of the attribute for this Instance + */ + private void setSparseValue(Instance instance, List<Integer> indexValues, List<Double> attributeValues, int numAttribute, double valueAttribute) { + + if (this.instanceInformation.classIndex() == numAttribute) { + instance.setClassValue(valueAttribute); + } else { + indexValues.add(numAttribute); + attributeValues.add(valueAttribute); + } + } + + /** + * Builds the Meta Data of from the Avro Schema + * @return + */ + protected InstanceInformation getHeader() { + + String relation = schema.getName(); + attributes = new ArrayList<Attribute>(); + + /** By Definition, the returned list is in the order of their positions. **/ + List<Schema.Field> fields = schema.getFields(); + + for (Field field : fields) { + Schema attributeSchema = field.schema(); + + /** Currently SAMOA supports only NOMINAL & Numeric Types.**/ + if(attributeSchema.getType()==Schema.Type.ENUM) + { + List<String> attributeLabels = attributeSchema.getEnumSymbols(); + attributes.add(new Attribute(field.name(), attributeLabels)); + } + else + attributes.add(new Attribute(field.name())); + } + return new InstanceInformation(relation, attributes); + } + + /** + * Identifies if the dataset is is Sparse or Dense + * @return boolean + */ + protected boolean isSparseData() + { + List<Schema.Field> fields = schema.getFields(); + for (Field field : fields) { + Schema attributeSchema = field.schema(); + + /** If even one attribute has a null union (nullable attribute) consider it as sparse data**/ + if(attributeSchema.getType()==Schema.Type.UNION) + { + List<Schema> unionTypes = attributeSchema.getTypes(); + for (Schema unionSchema : unionTypes) { + if(unionSchema.getType()==Schema.Type.NULL) + return true; + } + } + + } + return false; + } + + @Override + public InstanceInformation getStructure() { + return this.instanceInformation; + } + + /** Error Messages to for all types of Avro Loaders */ + protected static final String AVRO_LOADER_INVALID_TYPE_ERROR = "Invalid data type in the Avro data"; + protected static final String AVRO_LOADER_SCHEMA_READ_ERROR = "Exception while reading the schema from Avro File"; + protected static final String AVRO_LOADER_INSTANCE_READ_ERROR = "Exception while reading the Instance from Avro File."; +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java index 556caaa..707d7f2 100644 --- a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java @@ -30,6 +30,7 @@ import java.io.StringReader; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.io.InputStream; /** * @@ -45,8 +46,11 @@ public class Instances implements Serializable { * The instances. */ protected List<Instance> instances; + + transient protected Loader loader; + - transient protected ArffLoader arff; + protected static enum AVRO_ENCODING_FORMAT{JSON,BINARY} protected int classAttribute; @@ -69,12 +73,24 @@ public class Instances implements Serializable { } public Instances(Reader reader, int size, int classAttribute) { - this.classAttribute = classAttribute; - arff = new ArffLoader(reader, 0, classAttribute); - this.instanceInformation = arff.getStructure(); - this.instances = new ArrayList<>(); + this.classAttribute = classAttribute; + loader = new ArffLoader(reader, 0, classAttribute); + this.instanceInformation = loader.getStructure(); + this.instances = new ArrayList<>(); } + public Instances(InputStream inputStream, int classAttribute, String encodingFormat) { + this.classAttribute = classAttribute; + + if(encodingFormat.equalsIgnoreCase(AVRO_ENCODING_FORMAT.BINARY.toString())) + loader = new AvroBinaryLoader(inputStream, classAttribute); + else + loader = new AvroJsonLoader(inputStream, classAttribute); + + this.instanceInformation = loader.getStructure(); + this.instances = new ArrayList<>(); + } + public Instances(Instances chunk, int capacity) { this(chunk); } @@ -175,18 +191,22 @@ public class Instances implements Serializable { public boolean readInstance(Reader fileReader) { - // ArffReader arff = new ArffReader(reader, this, m_Lines, 1); - if (arff == null) { - arff = new ArffLoader(fileReader, 0, this.classAttribute); - } - Instance inst = arff.readInstance(fileReader); - if (inst != null) { - inst.setDataset(this); - add(inst); - return true; - } else { - return false; - } + if (loader == null) { + loader = new ArffLoader(fileReader, 0, this.classAttribute); + } + return readInstance() ; + } + + public boolean readInstance() { + + Instance inst = loader.readInstance(); + if (inst != null) { + inst.setDataset(this); + add(inst); + return true; + } else { + return false; + } } public void delete() { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java new file mode 100644 index 0000000..f806bf5 --- /dev/null +++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java @@ -0,0 +1,45 @@ +package org.apache.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.io.Serializable; + +/** + * Loads Instances from streams of different types of Input Formats e.g ARFF & AVRO + * @author jayadeepj + */ + +public interface Loader extends Serializable{ + + /** + * Fetch the Meta-data from the data + * @return InstanceInformation + */ + public InstanceInformation getStructure(); + + /** + * Read a single instance from the Stream + * @return Instance + */ + public Instance readInstance(); + +}
