SAMOA-47: Integrate Avro Streams with SAMOA

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/74979782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/74979782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/74979782

Branch: refs/heads/master
Commit: 74979782fabfa04b7701ae4c83b91bc38402c351
Parents: b84e8ac
Author: jayadeepj <[email protected]>
Authored: Fri Oct 30 14:57:06 2015 +0530
Committer: Gianmarco De Francisci Morales <[email protected]>
Committed: Mon Nov 30 13:20:35 2015 +0200

----------------------------------------------------------------------
 pom.xml                                         |   1 +
 .../samoa/moa/streams/AvroFileStream.java       | 171 +++++++++++
 samoa-instances/pom.xml                         |   7 +
 .../org/apache/samoa/instances/ArffLoader.java  |   8 +-
 .../samoa/instances/AvroBinaryLoader.java       | 120 ++++++++
 .../apache/samoa/instances/AvroJsonLoader.java  | 133 +++++++++
 .../org/apache/samoa/instances/AvroLoader.java  | 286 +++++++++++++++++++
 .../org/apache/samoa/instances/Instances.java   |  54 ++--
 .../java/org/apache/samoa/instances/Loader.java |  45 +++
 9 files changed, 806 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 41fc5bd..71b131f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
         <storm.version>0.9.4</storm.version>
         <!-- storm 0.8.2 loads zookeeper classes with hardcoded names from 3.3 
version-->
         <zookeeper.storm.version>3.4.6</zookeeper.storm.version>
+        <avro.version>1.7.7</avro.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
new file mode 100644
index 0000000..e684687
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
@@ -0,0 +1,171 @@
+package org.apache.samoa.moa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+import org.apache.samoa.streams.FileStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+
+/**
+ *  InstanceStream implementation to handle Apache Avro Files.
+ *  Handles both JSON & Binary encoded streams
+ *  
+ *  @author jayadeepj
+ *
+ */
+public class AvroFileStream extends FileStream {
+
+       private static final long serialVersionUID = 1L;
+       private static final Logger logger = 
LoggerFactory.getLogger(AvroFileStream.class);
+
+       public FileOption avroFileOption = new FileOption("avroFile", 'f',"Avro 
File(s) to load.", null, null, false);
+       public IntOption classIndexOption = new IntOption("classIndex", 
'c',"Class index of data. 0 for none or -1 for last attribute in file.",-1, -1, 
Integer.MAX_VALUE);
+       public StringOption encodingFormatOption = new 
StringOption("encodingFormatOption", 'e', "Encoding format for Avro Files. Can 
be JSON/AVRO", "BINARY");
+
+       /** Represents the last read Instance **/
+       protected InstanceExample lastInstanceRead;
+
+       /** Represents the binary input stream of avro data **/
+       protected transient InputStream inputStream = null;
+
+       /** The extension to be considered for the files **/
+       private static final String AVRO_FILE_EXTENSION = "avro";
+
+       /* (non-Javadoc)
+        * @see org.apache.samoa.streams.FileStream#reset()
+        * Reset the BINARY encoded Avro Stream & Close the file source
+        */
+       @Override
+       protected void reset() {
+
+               try {
+                       if (this.inputStream != null)
+                               this.inputStream.close();
+
+                       fileSource.reset();
+               } catch (IOException ioException) {
+                       logger.error(AVRO_STREAM_FAILED_RESTART_ERROR+" : 
{}",ioException);
+                       throw new 
RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException);
+               }
+
+               if (!getNextFileStream()) {
+                       hitEndOfStream = true;
+                       throw new 
RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
+               }
+       }
+
+
+       /**
+        * Get next File Stream & set the class index read from the command 
line option
+        * @return
+        */
+       protected boolean getNextFileStream() {
+               if (this.inputStream != null)
+                       try {
+                               this.inputStream.close();
+                       } catch (IOException ioException) {
+                               logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : 
{}",ioException);
+                               throw new 
RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+                       }
+
+               this.inputStream = this.fileSource.getNextInputStream();
+
+               if (this.inputStream == null)
+                       return false;
+
+               this.instances = new Instances(this.inputStream, 
classIndexOption.getValue(),encodingFormatOption.getValue());
+
+               if (this.classIndexOption.getValue() < 0) {
+                       
this.instances.setClassIndex(this.instances.numAttributes() - 1);
+               } else if (this.classIndexOption.getValue() > 0) {
+                       
this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+               }
+               return true;
+       }
+
+
+       /* (non-Javadoc)
+        * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
+        * Read next Instance from File. Return false if unable to read next 
Instance
+        */
+       @Override
+       protected boolean readNextInstanceFromFile() {
+               try {
+                       if (this.instances.readInstance()) {
+                               this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
+                               this.instances.delete(); 
+                               return true;
+                       }
+                       if (this.inputStream != null) {
+                               this.inputStream.close();
+                               this.inputStream = null;
+                       }
+                       return false;
+               } catch (IOException ioException) {
+                       logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : 
{}",ioException);
+                       throw new 
RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+               }
+
+       }
+
+       @Override
+       public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+               super.prepareForUseImpl(monitor, repository);
+               String filePath = 
this.avroFileOption.getFile().getAbsolutePath();
+               this.fileSource.init(filePath, 
AvroFileStream.AVRO_FILE_EXTENSION);
+               this.lastInstanceRead = null;
+       }
+
+
+       /* (non-Javadoc)
+        * @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
+        * Return the last read Instance
+        */
+       @Override
+       protected InstanceExample getLastInstanceRead() {
+               return this.lastInstanceRead;
+       }
+
+
+       @Override
+       public void getDescription(StringBuilder sb, int indent) {
+               throw new 
UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
+       }
+
+       /** Error Messages to for all types of Avro File Streams */
+       protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro 
FileStream restart failed."; 
+       protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro 
FileStream is empty."; 
+       protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed 
to read next instance from Avro File Stream.";
+       protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This 
method is not supported for AvroFileStream yet.";  
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-instances/pom.xml b/samoa-instances/pom.xml
index ed24597..64ffcd8 100644
--- a/samoa-instances/pom.xml
+++ b/samoa-instances/pom.xml
@@ -34,4 +34,11 @@
     <artifactId>samoa</artifactId>
     <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+                       <version>${avro.version}</version>
+               </dependency>
+       </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java 
b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
index 3d314f0..325d1b8 100644
--- a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
@@ -23,7 +23,6 @@ package org.apache.samoa.instances;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.Reader;
-import java.io.Serializable;
 import java.io.StreamTokenizer;
 import java.util.ArrayList;
 import java.util.List;
@@ -33,7 +32,7 @@ import java.util.logging.Logger;
 /**
  * @author abifet
  */
-public class ArffLoader implements Serializable {
+public class ArffLoader implements Loader {
 
   protected InstanceInformation instanceInformation;
 
@@ -393,4 +392,9 @@ public class ArffLoader implements Serializable {
       this.instanceInformation.setClassIndex(classAttribute - 1);
     }
   }
+
+  @Override
+  public Instance readInstance() {
+         return readInstance(this.reader);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
 
b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
new file mode 100644
index 0000000..c3e32dc
--- /dev/null
+++ 
b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
@@ -0,0 +1,120 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load Data from Binary Avro Stream and parse to corresponding Dense & Parse 
Instances
+ * 
+ * @author jayadeepj
+ *
+ */
+public class AvroBinaryLoader extends AvroLoader {
+
+       private static final long serialVersionUID = 1L;
+       private static final Logger logger = 
LoggerFactory.getLogger(AvroBinaryLoader.class);
+
+       /** Avro Binary reader for an input stream **/
+       protected DataFileStream<GenericRecord> dataFileStream   = null;
+
+       public AvroBinaryLoader(InputStream inputStream,int classAttribute) {
+               super(classAttribute);
+               initializeSchema(inputStream);
+       }
+
+       /* (non-Javadoc)
+        * @see 
org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream)
+        */
+       @Override
+       public void initializeSchema(InputStream inputStream)
+       {
+               try {
+                       this.datumReader = new 
GenericDatumReader<GenericRecord>();
+                       this.dataFileStream =  new 
DataFileStream<GenericRecord>(inputStream, datumReader);
+                       this.schema = dataFileStream.getSchema();
+
+                       this.instanceInformation = getHeader();
+                       this.isSparseData = isSparseData();
+
+                       if (classAttribute < 0) {
+                               
this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() 
- 1);
+                       } else if (classAttribute > 0) {
+                               
this.instanceInformation.setClassIndex(classAttribute - 1);
+                       }
+
+               } catch (IOException ioException) {
+                       logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : 
{}",ioException);
+                       throw new 
RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException);
+               }
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.samoa.instances.AvroLoader#readInstance()
+        */
+       @Override
+       public Instance readInstance() {
+
+               GenericRecord record = null;
+
+               try{
+                       if (dataFileStream.hasNext()) {
+                               record =(GenericRecord) dataFileStream.next();
+                       }
+               } catch (Exception ioException) {
+                       logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : 
{}",ioException);
+                       throw new 
RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+               }
+
+               if(record==null)
+               {
+                       //closeReader();
+                       return null;
+               }
+
+               if(isSparseData)
+                       return readInstanceSparse(record);
+
+               return readInstanceDense(record);
+       }
+
+       /**
+        * Close the Avro Data Stream
+        */
+       private void closeReader()
+       {
+               if(dataFileStream !=null)
+                       try {
+                               dataFileStream.close();
+                       } catch (IOException ioException) {
+                               logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" 
: {}",ioException);
+                               throw new 
RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+                       }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java 
b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java
new file mode 100644
index 0000000..827b507
--- /dev/null
+++ 
b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java
@@ -0,0 +1,133 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load Data from JSON Avro Stream and parse to corresponding Dense & Parse 
Instances
+ * 
+ * @author jayadeepj
+ *
+ */
+public class AvroJsonLoader extends AvroLoader {
+
+       private static final long serialVersionUID = 1L;
+       private static final Logger logger = 
LoggerFactory.getLogger(AvroJsonLoader.class);
+
+       /** The Character reader for JSON read */
+       protected Reader reader  = null;
+
+       public AvroJsonLoader(InputStream inputStream, int classAttribute) {
+               super(classAttribute);
+               initializeSchema(inputStream);
+       }
+
+       /* (non-Javadoc)
+        * @see 
org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream)
+        */
+       @Override
+       public void initializeSchema(InputStream inputStream)
+       {
+               String schemaString = null;
+               try {
+                       this.reader = new BufferedReader(new 
InputStreamReader(inputStream));
+                       schemaString = ((BufferedReader)this.reader).readLine();
+                       this.schema = new Schema.Parser().parse(schemaString);
+                       this.datumReader = new 
GenericDatumReader<GenericRecord>(schema);
+                       this.instanceInformation = getHeader();
+                       this.isSparseData = isSparseData();
+
+                       if (classAttribute < 0) {
+                               
this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() 
- 1);
+                       } else if (classAttribute > 0) {
+                               
this.instanceInformation.setClassIndex(classAttribute - 1);
+                       }
+
+               } catch (IOException ioException) {
+                       logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : 
{}",ioException);
+                       throw new 
RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException);
+               }
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.samoa.instances.AvroLoader#readInstance()
+        */
+       @Override
+       public Instance readInstance() {
+
+               String line = null;
+               Decoder decoder = null;
+               GenericRecord record = null;
+
+               try{
+                       while ((line = ((BufferedReader)reader).readLine()) != 
null) {
+                               if(line==null || line.trim().length()<=0)
+                                       continue;
+
+                               decoder = 
DecoderFactory.get().jsonDecoder(schema, line);
+                               record  = datumReader.read(null, decoder);
+                               break;
+                       }
+               } catch (IOException ioException) {
+                       logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : 
{}",ioException);
+                       throw new 
RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+               }
+
+               if(record==null)
+               {
+                       closeReader();
+                       return null;
+               }
+
+               if(isSparseData)
+                       return readInstanceSparse(record);
+
+               return readInstanceDense(record);
+       }
+
+       /**
+        * Close the Avro Data Stream
+        */
+       private void closeReader()
+       {
+               if(reader !=null)
+                       try {
+                               reader.close();
+                       } catch (IOException ioException) {
+                               logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" 
: {}",ioException);
+                               throw new 
RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+                       }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java 
b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java
new file mode 100644
index 0000000..09f410f
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java
@@ -0,0 +1,286 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData.EnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+
+/**
+ * Load Data from Avro Stream and parse to corresponding Dense & Parse 
Instances
+ * Abstract Class: Subclass this class for different types of Avro Encodings
+ * 
+ * @author jayadeepj
+ *
+ */
+public abstract class AvroLoader implements Loader {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Representation of the Avro Schema for the Instances being read. 
Built from the first line in the data  */
+       protected Schema schema = null;
+
+       /**  Meta-data of the Instance */
+       protected InstanceInformation instanceInformation;
+
+       /** List of attributes in the data as read from the schema */
+       protected List<Attribute> attributes;
+
+       /** This variable is to check if the data stored is Sparse or Dense */
+       protected boolean isSparseData;
+
+       protected int classAttribute;
+
+       /** Datum Reader for Avro Data*/
+       public DatumReader<GenericRecord> datumReader = null;
+
+       public AvroLoader(int classAttribute) {
+               this.classAttribute = classAttribute;
+               this.isSparseData = false;
+       }
+
+       /** Intialize Avro Schema, Meta Data, InstanceInformation from Input 
Avro Stream */
+       public abstract void initializeSchema(InputStream inputStream);
+
+       /** Read a single SAMOA Instance from Input Avro Stream */
+       public abstract Instance readInstance();
+       
+       /**
+        * Method to read Dense Instances from Avro File
+        * @return Instance
+        */
+       protected Instance readInstanceDense(GenericRecord record)
+       {
+               Instance instance = new 
DenseInstance(this.instanceInformation.numAttributes() + 1);
+               int numAttribute = 0;
+
+               for (Attribute attribute : attributes) {
+                       Object value = record.get(attribute.name);
+
+                       boolean isNumeric = 
attributes.get(numAttribute).isNumeric();
+                       boolean isNominal = 
attributes.get(numAttribute).isNominal();
+
+                       if(isNumeric)
+                       {
+                               if(value instanceof Double)     
+                                       this.setDenseValue(instance, 
numAttribute, (double)value);
+                               else if (value instanceof Long) 
+                                       this.setDenseValue(instance, 
numAttribute, (long)value);
+                               else if (value instanceof Integer)      
+                                       this.setDenseValue(instance, 
numAttribute, (int)value);
+                               else
+                                       throw new RuntimeException("Invalid 
data type in the Avro data for Numeric Type : "+attribute.name);
+                       }
+                       else if(isNominal)
+                       {
+                               double valueAttribute;
+
+                               if (!(value instanceof EnumSymbol))     
+                                       throw new RuntimeException("Invalid 
data type in the Avro data for Nominal Type : "+attribute.name);
+
+                               EnumSymbol enumSymbolalue = (EnumSymbol)value;
+
+                               String stringValue = enumSymbolalue.toString();
+
+                               if 
(("?".equals(stringValue))||(stringValue==null)) {
+                                       valueAttribute = Double.NaN; 
+                               } else {
+                                       valueAttribute = 
this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue);
+                               }
+
+                               this.setDenseValue(instance, numAttribute, 
valueAttribute);
+                       }
+                       numAttribute++;
+               }
+
+               return (numAttribute > 0) ? instance : null;
+
+       }
+
+
+       /**
+        * Sets a Dense Value in the corresponding attribute index
+        * @param instance is the Instance where values will be set
+        * @param numAttribute is the index of the attribute
+        * @param valueAttribute is the value of the attribute for this Instance
+        */
+
+       private void setDenseValue(Instance instance, int numAttribute, double 
valueAttribute) {
+
+               if (this.instanceInformation.classIndex() == numAttribute) 
+                       instance.setClassValue(valueAttribute);
+               else 
+                       instance.setValue(numAttribute, valueAttribute);
+       }
+
+       /**
+        * Method to read Sparse Instances from Avro File
+        * @return Instance
+        */
+       protected Instance readInstanceSparse(GenericRecord record) {
+
+               Instance instance = new SparseInstance(1.0, null); 
+               int numAttribute = -1;
+               ArrayList<Double> attributeValues = new ArrayList<Double>();
+               List<Integer> indexValues = new ArrayList<Integer>();
+
+               for (Attribute attribute : attributes) {
+                       numAttribute++;
+                       Object value = record.get(attribute.name);
+
+                       boolean isNumeric = 
attributes.get(numAttribute).isNumeric();
+                       boolean isNominal = 
attributes.get(numAttribute).isNominal();
+
+                       /** If value is empty/null iterate to the next 
attribute.**/
+                       if(value==null)
+                               continue;
+
+                       if(isNumeric)
+                       {
+                               if(value instanceof Double)     
+                                       this.setSparseValue(instance,  
indexValues, attributeValues, numAttribute, (double)value);
+                               else if (value instanceof Long) 
+                                       
this.setSparseValue(instance,indexValues, attributeValues, numAttribute, 
(long)value);
+                               else if (value instanceof Integer)      
+                                       
this.setSparseValue(instance,indexValues, attributeValues, numAttribute, 
(int)value);
+                               else
+                                       throw new 
RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name);
+                       }
+                       else if(isNominal)
+                       {
+                               double valueAttribute;
+
+                               if (!(value instanceof EnumSymbol))     
+                                       throw new 
RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name);
+
+                               EnumSymbol enumSymbolalue = (EnumSymbol)value;
+
+                               String stringValue = enumSymbolalue.toString();
+
+                               if 
(("?".equals(stringValue))||(stringValue==null)) {
+                                       valueAttribute = Double.NaN; 
+                               } else {
+                                       valueAttribute = 
this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue);
+                               }
+
+                               this.setSparseValue(instance, indexValues, 
attributeValues, numAttribute, valueAttribute);
+                       }
+               }
+
+               int[] arrayIndexValues = new int[attributeValues.size()];
+               double[] arrayAttributeValues = new 
double[attributeValues.size()];
+               
+               for (int i = 0; i < arrayIndexValues.length; i++) {
+                       arrayIndexValues[i] = indexValues.get(i).intValue();
+                       arrayAttributeValues[i] = 
attributeValues.get(i).doubleValue();
+               }
+               
+               instance.addSparseValues(arrayIndexValues, 
arrayAttributeValues,this.instanceInformation.numAttributes());
+               return instance;
+
+       }
+
+       /**
+        * Sets a Sparse Value in the corresponding attribute index
+        * @param instance  is the Instance where values will be set
+        * @param indexValues is the list of Index values
+        * @param attributeValues is the list of Attribute values
+        * @param numAttribute is the index of the attribute
+        * @param valueAttribute is the value of the attribute for this Instance
+        */
+       private void setSparseValue(Instance instance, List<Integer> 
indexValues, List<Double> attributeValues, int numAttribute, double 
valueAttribute) {
+       
+               if (this.instanceInformation.classIndex() == numAttribute) {
+                       instance.setClassValue(valueAttribute);
+               } else {
+                       indexValues.add(numAttribute);
+                       attributeValues.add(valueAttribute);
+               }
+       }
+
+       /**
+        * Builds the Meta Data of from the Avro Schema
+        * @return
+        */
+       protected InstanceInformation getHeader() {
+
+               String relation = schema.getName();
+               attributes = new ArrayList<Attribute>();
+
+               /** By Definition, the returned list is in the order of their 
positions. **/
+               List<Schema.Field> fields = schema.getFields();
+
+               for (Field field : fields) {
+                       Schema attributeSchema = field.schema();
+                       
+                       /** Currently SAMOA supports only NOMINAL & Numeric 
Types.**/
+                       if(attributeSchema.getType()==Schema.Type.ENUM)
+                       {
+                               List<String> attributeLabels = 
attributeSchema.getEnumSymbols();
+                               attributes.add(new Attribute(field.name(), 
attributeLabels));
+                       }
+                       else
+                               attributes.add(new Attribute(field.name()));
+               }
+               return new InstanceInformation(relation, attributes);
+       }
+
+       /**
+        * Identifies if the dataset is is Sparse or Dense
+        * @return boolean
+        */
+       protected boolean isSparseData()
+       {
+               List<Schema.Field> fields = schema.getFields();
+               for (Field field : fields) {
+                       Schema attributeSchema = field.schema(); 
+
+                       /** If even one attribute has a null union (nullable 
attribute) consider it as sparse data**/
+                       if(attributeSchema.getType()==Schema.Type.UNION)
+                       {
+                               List<Schema> unionTypes = 
attributeSchema.getTypes();
+                               for (Schema unionSchema : unionTypes) {
+                                       
if(unionSchema.getType()==Schema.Type.NULL)
+                                               return true;
+                               }
+                       }
+
+               }
+               return false;
+       }
+       
+       @Override
+       public InstanceInformation getStructure() {
+               return this.instanceInformation;
+       }
+
+       /** Error Messages to for all types of Avro Loaders */
+       protected static final String AVRO_LOADER_INVALID_TYPE_ERROR = "Invalid 
data type in the Avro data"; 
+       protected static final String AVRO_LOADER_SCHEMA_READ_ERROR = 
"Exception while reading the schema from Avro File"; 
+       protected static final String AVRO_LOADER_INSTANCE_READ_ERROR = 
"Exception while reading the Instance from Avro File.";
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java 
b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
index 556caaa..707d7f2 100644
--- a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
@@ -30,6 +30,7 @@ import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.io.InputStream;
 
 /**
  * 
@@ -45,8 +46,11 @@ public class Instances implements Serializable {
    * The instances.
    */
   protected List<Instance> instances;
+  
+  transient protected Loader loader;
+  
 
-  transient protected ArffLoader arff;
+  protected static enum AVRO_ENCODING_FORMAT{JSON,BINARY}
 
   protected int classAttribute;
 
@@ -69,12 +73,24 @@ public class Instances implements Serializable {
   }
 
   public Instances(Reader reader, int size, int classAttribute) {
-    this.classAttribute = classAttribute;
-    arff = new ArffLoader(reader, 0, classAttribute);
-    this.instanceInformation = arff.getStructure();
-    this.instances = new ArrayList<>();
+         this.classAttribute = classAttribute;
+         loader = new ArffLoader(reader, 0, classAttribute);
+         this.instanceInformation = loader.getStructure();
+         this.instances = new ArrayList<>();
   }
 
+  public Instances(InputStream inputStream, int classAttribute, String 
encodingFormat) {
+         this.classAttribute = classAttribute;
+
+         
if(encodingFormat.equalsIgnoreCase(AVRO_ENCODING_FORMAT.BINARY.toString()))
+                 loader = new AvroBinaryLoader(inputStream, classAttribute);
+         else
+                 loader = new AvroJsonLoader(inputStream, classAttribute);
+
+         this.instanceInformation = loader.getStructure();
+         this.instances = new ArrayList<>();
+  }
+       
   public Instances(Instances chunk, int capacity) {
     this(chunk);
   }
@@ -175,18 +191,22 @@ public class Instances implements Serializable {
 
   public boolean readInstance(Reader fileReader) {
 
-    // ArffReader arff = new ArffReader(reader, this, m_Lines, 1);
-    if (arff == null) {
-      arff = new ArffLoader(fileReader, 0, this.classAttribute);
-    }
-    Instance inst = arff.readInstance(fileReader);
-    if (inst != null) {
-      inst.setDataset(this);
-      add(inst);
-      return true;
-    } else {
-      return false;
-    }
+         if (loader == null) {
+                 loader = new ArffLoader(fileReader, 0, this.classAttribute);
+         }
+         return readInstance() ;
+  }
+
+  public boolean readInstance() {
+
+         Instance inst = loader.readInstance();
+         if (inst != null) {
+                 inst.setDataset(this);
+                 add(inst);
+                 return true;
+         } else {
+                 return false;
+         }
   }
 
   public void delete() {

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/74979782/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java 
b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java
new file mode 100644
index 0000000..f806bf5
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java
@@ -0,0 +1,45 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.Serializable;
+
+/**
+ * Loads Instances from streams of different types of Input Formats e.g ARFF & 
AVRO
+ * @author jayadeepj
+ */
+
+public interface Loader extends Serializable{
+
+       /**
+        * Fetch the Meta-data from the data 
+        * @return InstanceInformation
+        */
+       public InstanceInformation getStructure();
+
+       /**
+        * Read a single instance from the Stream
+        * @return Instance
+        */
+       public Instance readInstance(); 
+
+}

Reply via email to