http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
index 079423c..a4f885e 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
@@ -34,117 +34,117 @@ import org.apache.hadoop.io.IOUtils;
 
 /**
  * Source for FileStream for HDFS files
+ * 
  * @author Casey
  */
 public class HDFSFileStreamSource implements FileStreamSource {
-       
-       /**
+
+  /**
         * 
         */
-       private static final long serialVersionUID = -3887354805787167400L;
-       
-       private transient InputStream fileStream;
-    private transient Configuration config;
-    private List<String> filePaths;
-    private int currentIndex;
-       
-       public HDFSFileStreamSource(){
-               this.currentIndex = -1;
-       }
-       
-       public void init(String path, String ext) {
-               this.init(this.getDefaultConfig(), path, ext);
-       }
-       
-       public void init(Configuration config, String path, String ext) {
-               this.config = config;
-               this.filePaths = new ArrayList<String>();
-               Path hdfsPath = new Path(path);
-        FileSystem fs;
-        try {
-               fs = FileSystem.get(config);
-               FileStatus fileStat = fs.getFileStatus(hdfsPath);
-               if (fileStat.isDirectory()) {
-                       Path filterPath = hdfsPath;
-                       if (ext != null) {
-                               filterPath = new Path(path.toString(),"*."+ext);
-                       }
-                       else {
-                               filterPath = new Path(path.toString(),"*");
-                       }
-                       FileStatus[] filesInDir = fs.globStatus(filterPath);
-                       for (int i=0; i<filesInDir.length; i++) {
-                               if (filesInDir[i].isFile()) {
-                                       
filePaths.add(filesInDir[i].getPath().toString());
-                               }
-                       }
-               }
-               else {
-                       this.filePaths.add(path);
-               }
-        }
-        catch(IOException ioe) {
-            throw new RuntimeException("Failed getting list of files 
at:"+path,ioe);
-        }
-        
-               this.currentIndex = -1;
-       }
-       
-       private Configuration getDefaultConfig() {
-               String hadoopHome = System.getenv("HADOOP_HOME");
-        Configuration conf = new Configuration();
-        if (hadoopHome != null) {
-               java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml");
-               java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml");
-            conf.addResource(new 
Path(coreSitePath.toAbsolutePath().toString()));
-            conf.addResource(new 
Path(hdfsSitePath.toAbsolutePath().toString()));
+  private static final long serialVersionUID = -3887354805787167400L;
+
+  private transient InputStream fileStream;
+  private transient Configuration config;
+  private List<String> filePaths;
+  private int currentIndex;
+
+  public HDFSFileStreamSource() {
+    this.currentIndex = -1;
+  }
+
+  public void init(String path, String ext) {
+    this.init(this.getDefaultConfig(), path, ext);
+  }
+
+  public void init(Configuration config, String path, String ext) {
+    this.config = config;
+    this.filePaths = new ArrayList<String>();
+    Path hdfsPath = new Path(path);
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(config);
+      FileStatus fileStat = fs.getFileStatus(hdfsPath);
+      if (fileStat.isDirectory()) {
+        Path filterPath = hdfsPath;
+        if (ext != null) {
+          filterPath = new Path(path.toString(), "*." + ext);
         }
-        return conf;
-       }
-       
-       public void reset() throws IOException {
-               this.currentIndex = -1;
-               this.closeFileStream();
-       }
-
-       private void closeFileStream() {
-        IOUtils.closeStream(fileStream);
-       }
-
-       public InputStream getNextInputStream() {
-               this.closeFileStream();
-               if (this.currentIndex >= (this.filePaths.size()-1)) return null;
-               
-               this.currentIndex++;
-               String filePath = this.filePaths.get(currentIndex);
-               
-               Path hdfsPath = new Path(filePath);
-        FileSystem fs;
-        try {
-               fs = FileSystem.get(config);
-            fileStream = fs.open(hdfsPath);
+        else {
+          filterPath = new Path(path.toString(), "*");
         }
-        catch(IOException ioe) {
-            this.closeFileStream();
-            throw new RuntimeException("Failed opening file:"+filePath,ioe);
+        FileStatus[] filesInDir = fs.globStatus(filterPath);
+        for (int i = 0; i < filesInDir.length; i++) {
+          if (filesInDir[i].isFile()) {
+            filePaths.add(filesInDir[i].getPath().toString());
+          }
         }
-        
-        return fileStream;
-       }
-
-       public InputStream getCurrentInputStream() {
-               return fileStream;
-       }
-       
-       protected int getFilePathListSize() {
-               if (filePaths != null)
-                       return filePaths.size();
-               return 0;
-       }
-       
-       protected String getFilePathAt(int index) {
-               if (filePaths != null && filePaths.size() > index)
-                       return filePaths.get(index);
-               return null;
-       }
+      }
+      else {
+        this.filePaths.add(path);
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed getting list of files at:" + path, 
ioe);
+    }
+
+    this.currentIndex = -1;
+  }
+
+  private Configuration getDefaultConfig() {
+    String hadoopHome = System.getenv("HADOOP_HOME");
+    Configuration conf = new Configuration();
+    if (hadoopHome != null) {
+      java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml");
+      java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml");
+      conf.addResource(new Path(coreSitePath.toAbsolutePath().toString()));
+      conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString()));
+    }
+    return conf;
+  }
+
+  public void reset() throws IOException {
+    this.currentIndex = -1;
+    this.closeFileStream();
+  }
+
+  private void closeFileStream() {
+    IOUtils.closeStream(fileStream);
+  }
+
+  public InputStream getNextInputStream() {
+    this.closeFileStream();
+    if (this.currentIndex >= (this.filePaths.size() - 1))
+      return null;
+
+    this.currentIndex++;
+    String filePath = this.filePaths.get(currentIndex);
+
+    Path hdfsPath = new Path(filePath);
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(config);
+      fileStream = fs.open(hdfsPath);
+    } catch (IOException ioe) {
+      this.closeFileStream();
+      throw new RuntimeException("Failed opening file:" + filePath, ioe);
+    }
+
+    return fileStream;
+  }
+
+  public InputStream getCurrentInputStream() {
+    return fileStream;
+  }
+
+  protected int getFilePathListSize() {
+    if (filePaths != null)
+      return filePaths.size();
+    return 0;
+  }
+
+  protected String getFilePathAt(int index) {
+    if (filePaths != null && filePaths.size() > index)
+      return filePaths.get(index);
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
index c0ab44f..e4ceb70 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
@@ -31,101 +31,103 @@ import java.util.List;
 
 /**
  * Source for FileStream for local files
+ * 
  * @author Casey
  */
 public class LocalFileStreamSource implements FileStreamSource {
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 3986511547525870698L;
-       
-       private transient InputStream fileStream;
-    private List<String> filePaths;
-    private int currentIndex;
-       
-       public LocalFileStreamSource(){
-               this.currentIndex = -1;
-       }
-       
-       public void init(String path, String ext) {
-               this.filePaths = new ArrayList<String>();
-               File fileAtPath = new File(path);
-               if (fileAtPath.isDirectory()) {
-                       File[] filesInDir = fileAtPath.listFiles(new 
FileExtensionFilter(ext));
-                       for (int i=0; i<filesInDir.length; i++) {
-                               filePaths.add(filesInDir[i].getAbsolutePath());
-                       }
-               }
-               else {
-                       this.filePaths.add(path);
-               }
-               this.currentIndex = -1;
-       }
-       
-       public void reset() throws IOException {
-               this.currentIndex = -1;
-               this.closeFileStream();
-       }
-       
-       private void closeFileStream() {
-               if (fileStream != null) {
-                       try {
-                               fileStream.close();
-                       } catch (IOException ioe) {
-                               ioe.printStackTrace();
-                       }
-               }
-       }
-
-       public InputStream getNextInputStream() {
-               this.closeFileStream();
-               
-               if (this.currentIndex >= (this.filePaths.size()-1)) return null;
-               
-               this.currentIndex++;
-               String filePath = this.filePaths.get(currentIndex);
-               
-               File file = new File(filePath);
-        try {
-               fileStream = new FileInputStream(file);
-        }
-        catch(IOException ioe) {
-            this.closeFileStream();
-            throw new RuntimeException("Failed opening file:"+filePath,ioe);
-        }
-        
-        return fileStream;
-       }
-
-       public InputStream getCurrentInputStream() {
-               return fileStream;
-       }
-       
-       protected int getFilePathListSize() {
-               if (filePaths != null)
-                       return filePaths.size();
-               return 0;
-       }
-       
-       protected String getFilePathAt(int index) {
-               if (filePaths != null && filePaths.size() > index)
-                       return filePaths.get(index);
-               return null;
-       }
-       
-       private class FileExtensionFilter implements FilenameFilter {
-               private String extension;
-               FileExtensionFilter(String ext) {
-                       extension = ext;
-               }
-               
-               @Override
-               public boolean accept(File dir, String name) {
-                       File f = new File(dir,name);
-                       if (extension == null)
-                               return f.isFile();
-                       else
-                               return f.isFile() && 
name.toLowerCase().endsWith("."+extension);
-           }
-       }
+  private static final long serialVersionUID = 3986511547525870698L;
+
+  private transient InputStream fileStream;
+  private List<String> filePaths;
+  private int currentIndex;
+
+  public LocalFileStreamSource() {
+    this.currentIndex = -1;
+  }
+
+  public void init(String path, String ext) {
+    this.filePaths = new ArrayList<String>();
+    File fileAtPath = new File(path);
+    if (fileAtPath.isDirectory()) {
+      File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext));
+      for (int i = 0; i < filesInDir.length; i++) {
+        filePaths.add(filesInDir[i].getAbsolutePath());
+      }
+    }
+    else {
+      this.filePaths.add(path);
+    }
+    this.currentIndex = -1;
+  }
+
+  public void reset() throws IOException {
+    this.currentIndex = -1;
+    this.closeFileStream();
+  }
+
+  private void closeFileStream() {
+    if (fileStream != null) {
+      try {
+        fileStream.close();
+      } catch (IOException ioe) {
+        ioe.printStackTrace();
+      }
+    }
+  }
+
+  public InputStream getNextInputStream() {
+    this.closeFileStream();
+
+    if (this.currentIndex >= (this.filePaths.size() - 1))
+      return null;
+
+    this.currentIndex++;
+    String filePath = this.filePaths.get(currentIndex);
+
+    File file = new File(filePath);
+    try {
+      fileStream = new FileInputStream(file);
+    } catch (IOException ioe) {
+      this.closeFileStream();
+      throw new RuntimeException("Failed opening file:" + filePath, ioe);
+    }
+
+    return fileStream;
+  }
+
+  public InputStream getCurrentInputStream() {
+    return fileStream;
+  }
+
+  protected int getFilePathListSize() {
+    if (filePaths != null)
+      return filePaths.size();
+    return 0;
+  }
+
+  protected String getFilePathAt(int index) {
+    if (filePaths != null && filePaths.size() > index)
+      return filePaths.get(index);
+    return null;
+  }
+
+  private class FileExtensionFilter implements FilenameFilter {
+    private String extension;
+
+    FileExtensionFilter(String ext) {
+      extension = ext;
+    }
+
+    @Override
+    public boolean accept(File dir, String name) {
+      File f = new File(dir, name);
+      if (extension == null)
+        return f.isFile();
+      else
+        return f.isFile() && name.toLowerCase().endsWith("." + extension);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
index 4af3764..c62de1e 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
@@ -50,125 +50,138 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder;
  */
 public class ClusteringEvaluation implements Task, Configurable {
 
-    private static final long serialVersionUID = -8246537378371580550L;
+  private static final long serialVersionUID = -8246537378371580550L;
 
-    private static final int DISTRIBUTOR_PARALLELISM = 1;
+  private static final int DISTRIBUTOR_PARALLELISM = 1;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEvaluation.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEvaluation.class);
 
-    public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Clustering to run.", Learner.class, DistributedClusterer.class.getName());
+  public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Clustering to run.", Learner.class,
+      DistributedClusterer.class.getName());
 
-    public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', 
"Input stream.", InstanceStream.class,
-            RandomRBFGeneratorEvents.class.getName());
+  public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', 
"Input stream.", InstanceStream.class,
+      RandomRBFGeneratorEvents.class.getName());
 
-    public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', 
"Maximum number of instances to test/train on  (-1 = no limit).", 100000, -1,
-            Integer.MAX_VALUE);
+  public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
+      "Maximum number of instances to test/train on  (-1 = no limit).", 
100000, -1,
+      Integer.MAX_VALUE);
 
-    public IntOption measureCollectionTypeOption = new 
IntOption("measureCollectionType", 'm', "Type of measure collection", 0, 0, 
Integer.MAX_VALUE);
+  public IntOption measureCollectionTypeOption = new 
IntOption("measureCollectionType", 'm',
+      "Type of measure collection", 0, 0, Integer.MAX_VALUE);
 
-    public IntOption timeLimitOption = new IntOption("timeLimit", 't', 
"Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
-            Integer.MAX_VALUE);
+  public IntOption timeLimitOption = new IntOption("timeLimit", 't',
+      "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
+      Integer.MAX_VALUE);
 
-    public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f', "How many instances between samples of the learning performance.", 1000, 0,
-            Integer.MAX_VALUE);
+  public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f',
+      "How many instances between samples of the learning performance.", 1000, 
0,
+      Integer.MAX_VALUE);
 
-    public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation", 
"Clustering__"
-            + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+  public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
+      "Clustering__"
+          + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
 
-    public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File 
to append intermediate csv results to", null, "csv", true);
+  public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to 
append intermediate csv results to",
+      null, "csv", true);
 
-    public FloatOption samplingThresholdOption = new 
FloatOption("samplingThreshold", 'a', "Ratio of instances sampled that will be 
used for evaluation.", 0.5,
-            0.0, 1.0);
+  public FloatOption samplingThresholdOption = new 
FloatOption("samplingThreshold", 'a',
+      "Ratio of instances sampled that will be used for evaluation.", 0.5,
+      0.0, 1.0);
 
-    private ClusteringEntranceProcessor source;
-    private InstanceStream streamTrain;
-    private ClusteringDistributorProcessor distributor;
-    private Stream distributorStream;
-    private Stream evaluationStream;
-    
-    // Default=0: no delay/waiting
-    public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', 
"How many miliseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
-    
-    private Learner learner;
-    private ClusteringEvaluatorProcessor evaluator;
+  private ClusteringEntranceProcessor source;
+  private InstanceStream streamTrain;
+  private ClusteringDistributorProcessor distributor;
+  private Stream distributorStream;
+  private Stream evaluationStream;
 
-    private Topology topology;
-    private TopologyBuilder builder;
+  // Default=0: no delay/waiting
+  public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w',
+      "How many miliseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
 
-    public void getDescription(StringBuilder sb) {
-        sb.append("Clustering evaluation");
-    }
+  private Learner learner;
+  private ClusteringEvaluatorProcessor evaluator;
 
-    @Override
-    public void init() {
-        // TODO remove the if statement theoretically, dynamic binding will 
work here! for now, the if statement is used by Storm
-
-        if (builder == null) {
-            logger.warn("Builder was not initialized, initializing it from the 
Task");
-
-            builder = new TopologyBuilder();
-            logger.debug("Successfully instantiating TopologyBuilder");
-
-            builder.initTopology(evaluationNameOption.getValue(), 
sourceDelayOption.getValue());
-            logger.debug("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
-        }
-
-        // instantiate ClusteringEntranceProcessor and its output stream 
(sourceStream)
-        source = new ClusteringEntranceProcessor();
-        streamTrain = this.streamTrainOption.getValue();
-        source.setStreamSource(streamTrain);
-        builder.addEntranceProcessor(source);
-        source.setSamplingThreshold(samplingThresholdOption.getValue());
-        source.setMaxNumInstances(instanceLimitOption.getValue());
-        logger.debug("Successfully instantiated ClusteringEntranceProcessor");
-
-        Stream sourceStream = builder.createStream(source);
-        // starter.setInputStream(sourcePiOutputStream); // FIXME set stream 
in the EntrancePI
-
-        // distribution of instances and sampling for evaluation
-        distributor = new ClusteringDistributorProcessor();
-        builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM);
-        builder.connectInputShuffleStream(sourceStream, distributor);
-        distributorStream = builder.createStream(distributor);
-        distributor.setOutputStream(distributorStream);
-        evaluationStream = builder.createStream(distributor);
-        distributor.setEvaluationStream(evaluationStream); // passes 
evaluation events along
-        logger.debug("Successfully instantiated Distributor");
-       
-        // instantiate learner and connect it to distributorStream
-        learner = this.learnerOption.getValue();
-        learner.init(builder, source.getDataset(), 1);
-        builder.connectInputShuffleStream(distributorStream, 
learner.getInputProcessor());
-        logger.debug("Successfully instantiated Learner");
-
-        evaluator = new ClusteringEvaluatorProcessor.Builder(
-        sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile())
-            .decayHorizon(((ClusteringStream) 
streamTrain).getDecayHorizon()).build();
+  private Topology topology;
+  private TopologyBuilder builder;
 
-        builder.addProcessor(evaluator);
-        for (Stream evaluatorPiInputStream:learner.getResultStreams()) {
-               builder.connectInputShuffleStream(evaluatorPiInputStream, 
evaluator);
-        }
-        builder.connectInputAllStream(evaluationStream, evaluator);
-        logger.debug("Successfully instantiated EvaluatorProcessor");
+  public void getDescription(StringBuilder sb) {
+    sb.append("Clustering evaluation");
+  }
 
-        topology = builder.build();
-        logger.debug("Successfully built the topology");
-    }
+  @Override
+  public void init() {
+    // TODO remove the if statement theoretically, dynamic binding will work
+    // here! for now, the if statement is used by Storm
 
-    @Override
-    public void setFactory(ComponentFactory factory) {
-        // TODO unify this code with init() for now, it's used by S4 App
-        // dynamic binding theoretically will solve this problem
-        builder = new TopologyBuilder(factory);
-        logger.debug("Successfully instantiated TopologyBuilder");
+    if (builder == null) {
+      logger.warn("Builder was not initialized, initializing it from the 
Task");
 
-        builder.initTopology(evaluationNameOption.getValue());
-        logger.debug("Successfully initialized SAMOA topology with name {}", 
evaluationNameOption.getValue());
+      builder = new TopologyBuilder();
+      logger.debug("Successfully instantiating TopologyBuilder");
 
+      builder.initTopology(evaluationNameOption.getValue(), 
sourceDelayOption.getValue());
+      logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
     }
 
-    public Topology getTopology() {
-        return topology;
+    // instantiate ClusteringEntranceProcessor and its output stream
+    // (sourceStream)
+    source = new ClusteringEntranceProcessor();
+    streamTrain = this.streamTrainOption.getValue();
+    source.setStreamSource(streamTrain);
+    builder.addEntranceProcessor(source);
+    source.setSamplingThreshold(samplingThresholdOption.getValue());
+    source.setMaxNumInstances(instanceLimitOption.getValue());
+    logger.debug("Successfully instantiated ClusteringEntranceProcessor");
+
+    Stream sourceStream = builder.createStream(source);
+    // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the
+    // EntrancePI
+
+    // distribution of instances and sampling for evaluation
+    distributor = new ClusteringDistributorProcessor();
+    builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM);
+    builder.connectInputShuffleStream(sourceStream, distributor);
+    distributorStream = builder.createStream(distributor);
+    distributor.setOutputStream(distributorStream);
+    evaluationStream = builder.createStream(distributor);
+    distributor.setEvaluationStream(evaluationStream); // passes evaluation
+                                                       // events along
+    logger.debug("Successfully instantiated Distributor");
+
+    // instantiate learner and connect it to distributorStream
+    learner = this.learnerOption.getValue();
+    learner.init(builder, source.getDataset(), 1);
+    builder.connectInputShuffleStream(distributorStream, 
learner.getInputProcessor());
+    logger.debug("Successfully instantiated Learner");
+
+    evaluator = new ClusteringEvaluatorProcessor.Builder(
+        sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile())
+        .decayHorizon(((ClusteringStream) 
streamTrain).getDecayHorizon()).build();
+
+    builder.addProcessor(evaluator);
+    for (Stream evaluatorPiInputStream : learner.getResultStreams()) {
+      builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator);
     }
+    builder.connectInputAllStream(evaluationStream, evaluator);
+    logger.debug("Successfully instantiated EvaluatorProcessor");
+
+    topology = builder.build();
+    logger.debug("Successfully built the topology");
+  }
+
+  @Override
+  public void setFactory(ComponentFactory factory) {
+    // TODO unify this code with init() for now, it's used by S4 App
+    // dynamic binding theoretically will solve this problem
+    builder = new TopologyBuilder(factory);
+    logger.debug("Successfully instantiated TopologyBuilder");
+
+    builder.initTopology(evaluationNameOption.getValue());
+    logger.debug("Successfully initialized SAMOA topology with name {}", 
evaluationNameOption.getValue());
+
+  }
+
+  public Topology getTopology() {
+    return topology;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
index 70c44a1..081d123 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
@@ -49,158 +49,174 @@ import com.yahoo.labs.samoa.topology.Topology;
 import com.yahoo.labs.samoa.topology.TopologyBuilder;
 
 /**
- * Prequential Evaluation task is a scheme in evaluating performance of online 
classifiers which uses each instance for testing online classifiers model and
- * then it further uses the same instance for training the 
model(Test-then-train)
+ * Prequential Evaluation task is a scheme in evaluating performance of online
+ * classifiers which uses each instance for testing online classifiers model 
and
+ * then it further uses the same instance for training the
+ * model(Test-then-train)
  * 
  * @author Arinto Murdopo
  * 
  */
 public class PrequentialEvaluation implements Task, Configurable {
 
-    private static final long serialVersionUID = -8246537378371580550L;
+  private static final long serialVersionUID = -8246537378371580550L;
 
-    private static Logger logger = 
LoggerFactory.getLogger(PrequentialEvaluation.class);
+  private static Logger logger = 
LoggerFactory.getLogger(PrequentialEvaluation.class);
 
-    public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName());
+  public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Classifier to train.", Learner.class,
+      VerticalHoeffdingTree.class.getName());
 
-    public ClassOption streamTrainOption = new ClassOption("trainStream", 's', 
"Stream to learn from.", InstanceStream.class,
-            RandomTreeGenerator.class.getName());
+  public ClassOption streamTrainOption = new ClassOption("trainStream", 's', 
"Stream to learn from.",
+      InstanceStream.class,
+      RandomTreeGenerator.class.getName());
 
-    public ClassOption evaluatorOption = new ClassOption("evaluator", 'e', 
"Classification performance evaluation method.",
-            PerformanceEvaluator.class, 
BasicClassificationPerformanceEvaluator.class.getName());
+  public ClassOption evaluatorOption = new ClassOption("evaluator", 'e',
+      "Classification performance evaluation method.",
+      PerformanceEvaluator.class, 
BasicClassificationPerformanceEvaluator.class.getName());
 
-    public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', 
"Maximum number of instances to test/train on  (-1 = no limit).", 1000000, -1,
-            Integer.MAX_VALUE);
+  public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
+      "Maximum number of instances to test/train on  (-1 = no limit).", 
1000000, -1,
+      Integer.MAX_VALUE);
 
-    public IntOption timeLimitOption = new IntOption("timeLimit", 't', 
"Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
-            Integer.MAX_VALUE);
+  public IntOption timeLimitOption = new IntOption("timeLimit", 't',
+      "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
+      Integer.MAX_VALUE);
 
-    public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f', "How many instances between samples of the learning performance.", 100000,
-            0, Integer.MAX_VALUE);
+  public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f',
+      "How many instances between samples of the learning performance.", 
100000,
+      0, Integer.MAX_VALUE);
 
-    public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation", 
"Prequential_"
-            + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+  public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
+      "Prequential_"
+          + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
 
-    public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File 
to append intermediate csv results to", null, "csv", true);
+  public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to 
append intermediate csv results to",
+      null, "csv", true);
 
-    // Default=0: no delay/waiting
-    public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', 
"How many microseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
-    // Batch size to delay the incoming stream: delay of x milliseconds after 
each batch
-    public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b', 
"The delay batch size: delay of x milliseconds after each batch ", 1, 1, 
Integer.MAX_VALUE);
-    
-    private PrequentialSourceProcessor preqSource;
+  // Default=0: no delay/waiting
+  public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w',
+      "How many microseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
+  // Batch size to delay the incoming stream: delay of x milliseconds after 
each
+  // batch
+  public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b',
+      "The delay batch size: delay of x milliseconds after each batch ", 1, 1, 
Integer.MAX_VALUE);
 
-    // private PrequentialSourceTopologyStarter preqStarter;
+  private PrequentialSourceProcessor preqSource;
 
-    // private EntranceProcessingItem sourcePi;
+  // private PrequentialSourceTopologyStarter preqStarter;
 
-    private Stream sourcePiOutputStream;
+  // private EntranceProcessingItem sourcePi;
 
-    private Learner classifier;
+  private Stream sourcePiOutputStream;
 
-    private EvaluatorProcessor evaluator;
+  private Learner classifier;
 
-    // private ProcessingItem evaluatorPi;
+  private EvaluatorProcessor evaluator;
 
-    // private Stream evaluatorPiInputStream;
+  // private ProcessingItem evaluatorPi;
 
-    private Topology prequentialTopology;
+  // private Stream evaluatorPiInputStream;
 
-    private TopologyBuilder builder;
+  private Topology prequentialTopology;
 
-    public void getDescription(StringBuilder sb, int indent) {
-        sb.append("Prequential evaluation");
-    }
+  private TopologyBuilder builder;
 
-    @Override
-    public void init() {
-        // TODO remove the if statement
-        // theoretically, dynamic binding will work here!
-        // test later!
-        // for now, the if statement is used by Storm
-
-        if (builder == null) {
-            builder = new TopologyBuilder();
-            logger.debug("Successfully instantiating TopologyBuilder");
-
-            builder.initTopology(evaluationNameOption.getValue());
-            logger.debug("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
-        }
-
-        // instantiate PrequentialSourceProcessor and its output stream 
(sourcePiOutputStream)
-        preqSource = new PrequentialSourceProcessor();
-        preqSource.setStreamSource((InstanceStream) 
this.streamTrainOption.getValue());
-        preqSource.setMaxNumInstances(instanceLimitOption.getValue());
-        preqSource.setSourceDelay(sourceDelayOption.getValue());
-        preqSource.setDelayBatchSize(batchDelayOption.getValue());
-        builder.addEntranceProcessor(preqSource);
-        logger.debug("Successfully instantiating PrequentialSourceProcessor");
-
-        // preqStarter = new PrequentialSourceTopologyStarter(preqSource, 
instanceLimitOption.getValue());
-        // sourcePi = builder.createEntrancePi(preqSource, preqStarter);
-        // sourcePiOutputStream = builder.createStream(sourcePi);
-
-        sourcePiOutputStream = builder.createStream(preqSource);
-        // preqStarter.setInputStream(sourcePiOutputStream);
-
-        // instantiate classifier and connect it to sourcePiOutputStream
-        classifier = this.learnerOption.getValue();
-        classifier.init(builder, preqSource.getDataset(), 1);
-        builder.connectInputShuffleStream(sourcePiOutputStream, 
classifier.getInputProcessor());
-        logger.debug("Successfully instantiating Classifier");
-
-        PerformanceEvaluator evaluatorOptionValue = 
this.evaluatorOption.getValue();
-        if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, 
evaluatorOptionValue)) {
-               evaluatorOptionValue = 
getDefaultPerformanceEvaluatorForLearner(classifier);
-        }
-        evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue)
-                
.samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build();
-
-        // evaluatorPi = builder.createPi(evaluator);
-        // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream);
-        builder.addProcessor(evaluator);
-        for (Stream evaluatorPiInputStream:classifier.getResultStreams()) {
-               builder.connectInputShuffleStream(evaluatorPiInputStream, 
evaluator);
-        }
-        
-        logger.debug("Successfully instantiating EvaluatorProcessor");
-
-        prequentialTopology = builder.build();
-        logger.debug("Successfully building the topology");
-    }
+  public void getDescription(StringBuilder sb, int indent) {
+    sb.append("Prequential evaluation");
+  }
 
-    @Override
-    public void setFactory(ComponentFactory factory) {
-        // TODO unify this code with init()
-        // for now, it's used by S4 App
-        // dynamic binding theoretically will solve this problem
-        builder = new TopologyBuilder(factory);
-        logger.debug("Successfully instantiating TopologyBuilder");
+  @Override
+  public void init() {
+    // TODO remove the if statement
+    // theoretically, dynamic binding will work here!
+    // test later!
+    // for now, the if statement is used by Storm
 
-        builder.initTopology(evaluationNameOption.getValue());
-        logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+    if (builder == null) {
+      builder = new TopologyBuilder();
+      logger.debug("Successfully instantiating TopologyBuilder");
 
+      builder.initTopology(evaluationNameOption.getValue());
+      logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
     }
 
-    public Topology getTopology() {
-        return prequentialTopology;
+    // instantiate PrequentialSourceProcessor and its output stream
+    // (sourcePiOutputStream)
+    preqSource = new PrequentialSourceProcessor();
+    preqSource.setStreamSource((InstanceStream) 
this.streamTrainOption.getValue());
+    preqSource.setMaxNumInstances(instanceLimitOption.getValue());
+    preqSource.setSourceDelay(sourceDelayOption.getValue());
+    preqSource.setDelayBatchSize(batchDelayOption.getValue());
+    builder.addEntranceProcessor(preqSource);
+    logger.debug("Successfully instantiating PrequentialSourceProcessor");
+
+    // preqStarter = new PrequentialSourceTopologyStarter(preqSource,
+    // instanceLimitOption.getValue());
+    // sourcePi = builder.createEntrancePi(preqSource, preqStarter);
+    // sourcePiOutputStream = builder.createStream(sourcePi);
+
+    sourcePiOutputStream = builder.createStream(preqSource);
+    // preqStarter.setInputStream(sourcePiOutputStream);
+
+    // instantiate classifier and connect it to sourcePiOutputStream
+    classifier = this.learnerOption.getValue();
+    classifier.init(builder, preqSource.getDataset(), 1);
+    builder.connectInputShuffleStream(sourcePiOutputStream, 
classifier.getInputProcessor());
+    logger.debug("Successfully instantiating Classifier");
+
+    PerformanceEvaluator evaluatorOptionValue = 
this.evaluatorOption.getValue();
+    if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, 
evaluatorOptionValue)) {
+      evaluatorOptionValue = 
getDefaultPerformanceEvaluatorForLearner(classifier);
     }
-    //
-    // @Override
-    // public TopologyStarter getTopologyStarter() {
-    // return this.preqStarter;
-    // }
-    
-    private static boolean isLearnerAndEvaluatorCompatible(Learner learner, 
PerformanceEvaluator evaluator) {
-        return (learner instanceof RegressionLearner && evaluator instanceof 
RegressionPerformanceEvaluator) ||
-            (learner instanceof ClassificationLearner && evaluator instanceof 
ClassificationPerformanceEvaluator);
+    evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue)
+        
.samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build();
+
+    // evaluatorPi = builder.createPi(evaluator);
+    // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream);
+    builder.addProcessor(evaluator);
+    for (Stream evaluatorPiInputStream : classifier.getResultStreams()) {
+      builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator);
     }
-    
-    private static PerformanceEvaluator 
getDefaultPerformanceEvaluatorForLearner(Learner learner) {
-       if (learner instanceof RegressionLearner) {
-               return new BasicRegressionPerformanceEvaluator();
-       }
-       // Default to BasicClassificationPerformanceEvaluator for all other 
cases
-       return new BasicClassificationPerformanceEvaluator();
+
+    logger.debug("Successfully instantiating EvaluatorProcessor");
+
+    prequentialTopology = builder.build();
+    logger.debug("Successfully building the topology");
+  }
+
+  @Override
+  public void setFactory(ComponentFactory factory) {
+    // TODO unify this code with init()
+    // for now, it's used by S4 App
+    // dynamic binding theoretically will solve this problem
+    builder = new TopologyBuilder(factory);
+    logger.debug("Successfully instantiating TopologyBuilder");
+
+    builder.initTopology(evaluationNameOption.getValue());
+    logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+
+  }
+
+  public Topology getTopology() {
+    return prequentialTopology;
+  }
+
+  //
+  // @Override
+  // public TopologyStarter getTopologyStarter() {
+  // return this.preqStarter;
+  // }
+
+  private static boolean isLearnerAndEvaluatorCompatible(Learner learner, 
PerformanceEvaluator evaluator) {
+    return (learner instanceof RegressionLearner && evaluator instanceof 
RegressionPerformanceEvaluator) ||
+        (learner instanceof ClassificationLearner && evaluator instanceof 
ClassificationPerformanceEvaluator);
+  }
+
+  private static PerformanceEvaluator 
getDefaultPerformanceEvaluatorForLearner(Learner learner) {
+    if (learner instanceof RegressionLearner) {
+      return new BasicRegressionPerformanceEvaluator();
     }
+    // Default to BasicClassificationPerformanceEvaluator for all other cases
+    return new BasicClassificationPerformanceEvaluator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
index 41b47e4..5753349 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
@@ -28,34 +28,34 @@ import com.yahoo.labs.samoa.topology.Topology;
  */
 public interface Task {
 
-       /**
-        * Initialize this SAMOA task, 
-        * i.e. create and connect ProcessingItems and Streams
-        * and initialize the topology
-        */
-       public void init();     
-       
-       /**
-        * Return the final topology object to be executed in the cluster
-        * @return topology object to be submitted to be executed in the cluster
-        */
-       public Topology getTopology();
-       
-    // /**
-    // * Return the entrance processor to start SAMOA topology
-    // * The logic to start the topology should be implemented here
-    // * @return entrance processor to start the topology
-    // */
-    // public TopologyStarter getTopologyStarter();
-       
-       /**
-        * Sets the factory.
-        * TODO: propose to hide factory from task, 
-        * i.e. Task will only see TopologyBuilder, 
-        * and factory creation will be handled by TopologyBuilder
-        *
-        * @param factory the new factory
-        */
-       public void setFactory(ComponentFactory factory) ;
-       
+  /**
+   * Initialize this SAMOA task, i.e. create and connect ProcessingItems and
+   * Streams and initialize the topology
+   */
+  public void init();
+
+  /**
+   * Return the final topology object to be executed in the cluster
+   * 
+   * @return topology object to be submitted to be executed in the cluster
+   */
+  public Topology getTopology();
+
+  // /**
+  // * Return the entrance processor to start SAMOA topology
+  // * The logic to start the topology should be implemented here
+  // * @return entrance processor to start the topology
+  // */
+  // public TopologyStarter getTopologyStarter();
+
+  /**
+   * Sets the factory. TODO: propose to hide factory from task, i.e. Task will
+   * only see TopologyBuilder, and factory creation will be handled by
+   * TopologyBuilder
+   * 
+   * @param factory
+   *          the new factory
+   */
+  public void setFactory(ComponentFactory factory);
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
index c0f0cc3..1e3c9b5 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
@@ -24,85 +24,93 @@ import com.yahoo.labs.samoa.core.EntranceProcessor;
 
 /**
  * Helper class for EntranceProcessingItem implementation.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public abstract class AbstractEntranceProcessingItem implements 
EntranceProcessingItem {
-       private EntranceProcessor processor;
-       private String name;
-       private Stream outputStream;
-       
-       /*
-        * Constructor
-        */
-       public AbstractEntranceProcessingItem() {
-               this(null);
-       }
-       public AbstractEntranceProcessingItem(EntranceProcessor processor) {
-               this.processor = processor;
-       }
-       
-       /*
-        * Processor
-        */
-       /**
-        * Set the entrance processor for this EntranceProcessingItem
-        * @param processor
-        *                      the processor
-        */
-       protected void setProcessor(EntranceProcessor processor) {
-               this.processor = processor;
-       }
-       
-       /**
-        * Get the EntranceProcessor of this EntranceProcessingItem.
-        * @return the EntranceProcessor
-        */
-       public EntranceProcessor getProcessor() {
-               return this.processor;
-       }
-       
-       /*
-        * Name/ID
-        */
-       /**
-        * Set the name (or ID) of this EntranceProcessingItem
-        * @param name
-        */
-       public void setName(String name) {
-               this.name = name;
-       }
-       
-       /**
-        * Get the name (or ID) of this EntranceProcessingItem
-        * @return the name (or ID)
-        */
-       public String getName() {
-               return this.name;
-       }
-       
-       /*
-        * Output Stream
-        */
-       /**
-        * Set the output stream of this EntranceProcessingItem.
-        * An EntranceProcessingItem should have only 1 single output stream and
-        * should not be re-assigned.
-        * @return this EntranceProcessingItem
-        */
-       public EntranceProcessingItem setOutputStream(Stream outputStream) {
-               if (this.outputStream != null && this.outputStream != 
outputStream) {
-                       throw new IllegalStateException("Cannot overwrite 
output stream of EntranceProcessingItem");
-               } else 
-                       this.outputStream = outputStream;
-               return this;
-       }
-       
-       /**
-        * Get the output stream of this EntranceProcessingItem.
-        * @return the output stream
-        */
-       public Stream getOutputStream() {
-               return this.outputStream;
-       }
+  private EntranceProcessor processor;
+  private String name;
+  private Stream outputStream;
+
+  /*
+   * Constructor
+   */
+  public AbstractEntranceProcessingItem() {
+    this(null);
+  }
+
+  public AbstractEntranceProcessingItem(EntranceProcessor processor) {
+    this.processor = processor;
+  }
+
+  /*
+   * Processor
+   */
+  /**
+   * Set the entrance processor for this EntranceProcessingItem
+   * 
+   * @param processor
+   *          the processor
+   */
+  protected void setProcessor(EntranceProcessor processor) {
+    this.processor = processor;
+  }
+
+  /**
+   * Get the EntranceProcessor of this EntranceProcessingItem.
+   * 
+   * @return the EntranceProcessor
+   */
+  public EntranceProcessor getProcessor() {
+    return this.processor;
+  }
+
+  /*
+   * Name/ID
+   */
+  /**
+   * Set the name (or ID) of this EntranceProcessingItem
+   * 
+   * @param name
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the name (or ID) of this EntranceProcessingItem
+   * 
+   * @return the name (or ID)
+   */
+  public String getName() {
+    return this.name;
+  }
+
+  /*
+   * Output Stream
+   */
+  /**
+   * Set the output stream of this EntranceProcessingItem. An
+   * EntranceProcessingItem should have only 1 single output stream and should
+   * not be re-assigned.
+   * 
+   * @return this EntranceProcessingItem
+   */
+  public EntranceProcessingItem setOutputStream(Stream outputStream) {
+    if (this.outputStream != null && this.outputStream != outputStream) {
+      throw new IllegalStateException("Cannot overwrite output stream of 
EntranceProcessingItem");
+    } else
+      this.outputStream = outputStream;
+    return this;
+  }
+
+  /**
+   * Get the output stream of this EntranceProcessingItem.
+   * 
+   * @return the output stream
+   */
+  public Stream getOutputStream() {
+    return this.outputStream;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
index d0f04f7..60d76e0 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
@@ -26,136 +26,145 @@ import com.yahoo.labs.samoa.utils.PartitioningScheme;
 /**
  * Abstract ProcessingItem
  * 
- * Helper for implementation of ProcessingItem. It has basic information
- * for a ProcessingItem: name, parallelismLevel and a processor.
- * Subclass of this class needs to implement {@link #addInputStream(Stream, 
PartitioningScheme)}.
+ * Helper for implementation of ProcessingItem. It has basic information for a
+ * ProcessingItem: name, parallelismLevel and a processor. Subclass of this
+ * class needs to implement {@link #addInputStream(Stream, 
PartitioningScheme)}.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public abstract class AbstractProcessingItem implements ProcessingItem {
-       private String name;
-       private int parallelism;
-       private Processor processor;
-       
-       /*
-        * Constructor
-        */
-       public AbstractProcessingItem() {
-               this(null);
-       }
-       public AbstractProcessingItem(Processor processor) {
-               this(processor,1);
-       }
-       public AbstractProcessingItem(Processor processor, int parallelism) {
-               this.processor = processor;
-               this.parallelism = parallelism;
-       }
-       
-       /*
-        * Processor
-        */
-       /**
-        * Set the processor for this ProcessingItem
-        * @param processor
-        *                      the processor
-        */
-       protected void setProcessor(Processor processor) {
-               this.processor = processor;
-       }
-       
-       /**
-        * Get the processor of this ProcessingItem
-        * @return the processor
-        */
-       public Processor getProcessor() {
-               return this.processor;
-       }
-       
-       /*
-        * Parallelism 
-        */
-       /**
-        * Set the parallelism factor of this ProcessingItem
-        * @param parallelism
-        */
-       protected void setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-       }
-       
-       /**
-        * Get the parallelism factor of this ProcessingItem
-        * @return the parallelism factor
-        */
-       @Override
-       public int getParallelism() {
-               return this.parallelism;
-       }
-       
-       /*
-        * Name/ID
-        */
-       /**
-        * Set the name (or ID) of this ProcessingItem
-        * @param name
-        *                      the name/ID
-        */
-       public void setName(String name) {
-               this.name = name;
-       }
-       
-       /**
-        * Get the name (or ID) of this ProcessingItem
-        * @return the name/ID
-        */
-       public String getName() {
-               return this.name;
-       }
-       
-       /*
-        * Add input streams
-        */
-       /**
-        * Add an input stream to this ProcessingItem
-        * 
-        * @param inputStream
-        *                      the input stream to add
-        * @param scheme
-        *                      partitioning scheme associated with this 
ProcessingItem and the input stream
-        * @return this ProcessingItem
-        */
-       protected abstract ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme);
-
-       /**
-        * Add an input stream to this ProcessingItem with SHUFFLE scheme
-        * 
-        * @param inputStream
-        *                      the input stream
-        * @return this ProcessingItem
-        */
-    public ProcessingItem connectInputShuffleStream(Stream inputStream) {
-       return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE);
-    }
-
-    /**
-        * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme
-        * 
-        * @param inputStream
-        *                      the input stream
-        * @return this ProcessingItem
-        */
-    public ProcessingItem connectInputKeyStream(Stream inputStream) {
-       return this.addInputStream(inputStream, 
PartitioningScheme.GROUP_BY_KEY);
-    }
-
-    /**
-        * Add an input stream to this ProcessingItem with BROADCAST scheme
-        * 
-        * @param inputStream
-        *                      the input stream
-        * @return this ProcessingItem
-        */
-    public ProcessingItem connectInputAllStream(Stream inputStream) {
-       return this.addInputStream(inputStream, PartitioningScheme.BROADCAST);
-    }
+  private String name;
+  private int parallelism;
+  private Processor processor;
+
+  /*
+   * Constructor
+   */
+  public AbstractProcessingItem() {
+    this(null);
+  }
+
+  public AbstractProcessingItem(Processor processor) {
+    this(processor, 1);
+  }
+
+  public AbstractProcessingItem(Processor processor, int parallelism) {
+    this.processor = processor;
+    this.parallelism = parallelism;
+  }
+
+  /*
+   * Processor
+   */
+  /**
+   * Set the processor for this ProcessingItem
+   * 
+   * @param processor
+   *          the processor
+   */
+  protected void setProcessor(Processor processor) {
+    this.processor = processor;
+  }
+
+  /**
+   * Get the processor of this ProcessingItem
+   * 
+   * @return the processor
+   */
+  public Processor getProcessor() {
+    return this.processor;
+  }
+
+  /*
+   * Parallelism
+   */
+  /**
+   * Set the parallelism factor of this ProcessingItem
+   * 
+   * @param parallelism
+   */
+  protected void setParallelism(int parallelism) {
+    this.parallelism = parallelism;
+  }
+
+  /**
+   * Get the parallelism factor of this ProcessingItem
+   * 
+   * @return the parallelism factor
+   */
+  @Override
+  public int getParallelism() {
+    return this.parallelism;
+  }
+
+  /*
+   * Name/ID
+   */
+  /**
+   * Set the name (or ID) of this ProcessingItem
+   * 
+   * @param name
+   *          the name/ID
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the name (or ID) of this ProcessingItem
+   * 
+   * @return the name/ID
+   */
+  public String getName() {
+    return this.name;
+  }
+
+  /*
+   * Add input streams
+   */
+  /**
+   * Add an input stream to this ProcessingItem
+   * 
+   * @param inputStream
+   *          the input stream to add
+   * @param scheme
+   *          partitioning scheme associated with this ProcessingItem and the
+   *          input stream
+   * @return this ProcessingItem
+   */
+  protected abstract ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme);
+
+  /**
+   * Add an input stream to this ProcessingItem with SHUFFLE scheme
+   * 
+   * @param inputStream
+   *          the input stream
+   * @return this ProcessingItem
+   */
+  public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+    return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE);
+  }
+
+  /**
+   * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme
+   * 
+   * @param inputStream
+   *          the input stream
+   * @return this ProcessingItem
+   */
+  public ProcessingItem connectInputKeyStream(Stream inputStream) {
+    return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY);
+  }
+
+  /**
+   * Add an input stream to this ProcessingItem with BROADCAST scheme
+   * 
+   * @param inputStream
+   *          the input stream
+   * @return this ProcessingItem
+   */
+  public ProcessingItem connectInputAllStream(Stream inputStream) {
+    return this.addInputStream(inputStream, PartitioningScheme.BROADCAST);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
index b3544ed..a16d566 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
@@ -25,91 +25,95 @@ import com.yahoo.labs.samoa.core.ContentEvent;
 /**
  * Abstract Stream
  * 
- * Helper for implementation of Stream. It has basic information
- * for a Stream: streamID and source ProcessingItem.
- * Subclass of this class needs to implement {@link #put(ContentEvent)}.
+ * Helper for implementation of Stream. It has basic information for a Stream:
+ * streamID and source ProcessingItem. Subclass of this class needs to 
implement
+ * {@link #put(ContentEvent)}.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 
 public abstract class AbstractStream implements Stream {
-       private String streamID;
-       private IProcessingItem sourcePi;
-       private int batchSize;
- 
-       /*
-        * Constructor
-        */
-       public AbstractStream() {
-               this(null);
-       }
-       public AbstractStream(IProcessingItem sourcePi) {
-               this.sourcePi = sourcePi;
-               this.batchSize = 1;
-       }
-       
-       /**
-        * Get source processing item of this stream
-        * @return
-        */
-       public IProcessingItem getSourceProcessingItem() {
-               return this.sourcePi;
-       }
+  private String streamID;
+  private IProcessingItem sourcePi;
+  private int batchSize;
+
+  /*
+   * Constructor
+   */
+  public AbstractStream() {
+    this(null);
+  }
+
+  public AbstractStream(IProcessingItem sourcePi) {
+    this.sourcePi = sourcePi;
+    this.batchSize = 1;
+  }
+
+  /**
+   * Get source processing item of this stream
+   * 
+   * @return
+   */
+  public IProcessingItem getSourceProcessingItem() {
+    return this.sourcePi;
+  }
+
+  /*
+   * Process event
+   */
+  @Override
+  /**
+   * Send a ContentEvent
+   * @param event
+   *                   the ContentEvent to be sent
+   */
+  public abstract void put(ContentEvent event);
+
+  /*
+   * Stream name
+   */
+  /**
+   * Get name (ID) of this stream
+   * 
+   * @return the name (ID)
+   */
+  @Override
+  public String getStreamId() {
+    return this.streamID;
+  }
 
-    /*
-     * Process event
-     */
-    @Override
-    /**
-     * Send a ContentEvent
-     * @param event
-     *                         the ContentEvent to be sent
-     */
-    public abstract void put(ContentEvent event);
+  /**
+   * Set the name (ID) of this stream
+   * 
+   * @param streamID
+   *          the name (ID)
+   */
+  public void setStreamId(String streamID) {
+    this.streamID = streamID;
+  }
 
-    /*
-     * Stream name
-     */
-    /**
-     * Get name (ID) of this stream
-     * @return the name (ID)
-     */
-    @Override
-    public String getStreamId() {
-       return this.streamID;
-    }
-    
-    /**
-     * Set the name (ID) of this stream
-     * @param streamID
-     *                         the name (ID)
-     */
-    public void setStreamId (String streamID) {
-       this.streamID = streamID;
-    }
-  
-    /*
-     * Batch size
-     */
-    /**
-     * Set suggested batch size
-     *
-     * @param batchSize
-     * the suggested batch size
-     *
-     */
-    @Override
-    public void setBatchSize(int batchSize) {
-       this.batchSize = batchSize;
-    }
+  /*
+   * Batch size
+   */
+  /**
+   * Set suggested batch size
+   * 
+   * @param batchSize
+   *          the suggested batch size
+   * 
+   */
+  @Override
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
 
-    /**
-     * Get suggested batch size
-     *
-     * @return the suggested batch size
-     */
-    public int getBatchSize() {
-       return this.batchSize;
-    }
+  /**
+   * Get suggested batch size
+   * 
+   * @return the suggested batch size
+   */
+  public int getBatchSize() {
+    return this.batchSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
index 53385b1..00096ca 100755
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
@@ -26,108 +26,109 @@ import java.util.Set;
 /**
  * Topology abstract class.
  * 
- * It manages basic information of a topology: name, sets of Streams and 
ProcessingItems
+ * It manages basic information of a topology: name, sets of Streams and
+ * ProcessingItems
  * 
  */
 public abstract class AbstractTopology implements Topology {
 
-       private String topoName;
-    private Set<Stream> streams;
-    private Set<IProcessingItem> processingItems;
-    private Set<EntranceProcessingItem> entranceProcessingItems;
+  private String topoName;
+  private Set<Stream> streams;
+  private Set<IProcessingItem> processingItems;
+  private Set<EntranceProcessingItem> entranceProcessingItems;
 
-    protected AbstractTopology(String name) {
-       this.topoName = name;
-       this.streams = new HashSet<>();
-        this.processingItems = new HashSet<>();
-        this.entranceProcessingItems = new HashSet<>();
-    }
-    
-    /**
-     * Gets the name of this topology
-     * 
-     * @return name of the topology
-     */
-    public String getTopologyName() {
-       return this.topoName;
-    }
-    
-    /**
-     * Sets the name of this topology
-     * 
-     * @param topologyName
-     *                         name of the topology
-     */
-    public void setTopologyName(String topologyName) {
-       this.topoName = topologyName;
-    }
-    
-    /**
-     * Adds an Entrance processing item to the topology.
-     * 
-     * @param epi
-     *                         Entrance processing item
-     */
-    public void addEntranceProcessingItem(EntranceProcessingItem epi) {
-       this.entranceProcessingItems.add(epi);
-       this.addProcessingItem(epi);
-    }
-    
-    /**
-     * Gets entrance processing items in the topology
-     * 
-     * @return the set of processing items
-     */
-    public Set<EntranceProcessingItem> getEntranceProcessingItems() {
-       return this.entranceProcessingItems;
-    }
+  protected AbstractTopology(String name) {
+    this.topoName = name;
+    this.streams = new HashSet<>();
+    this.processingItems = new HashSet<>();
+    this.entranceProcessingItems = new HashSet<>();
+  }
 
-    /**
-     * Add processing item to topology.
-     * 
-     * @param procItem
-     *            Processing item.
-     */
-    public void addProcessingItem(IProcessingItem procItem) {
-        addProcessingItem(procItem, 1);
-    }
+  /**
+   * Gets the name of this topology
+   * 
+   * @return name of the topology
+   */
+  public String getTopologyName() {
+    return this.topoName;
+  }
 
-    /**
-     * Add processing item to topology.
-     * 
-     * @param procItem
-     *            Processing item.
-     * @param parallelismHint
-     *            Processing item parallelism level.
-     */
-    public void addProcessingItem(IProcessingItem procItem, int 
parallelismHint) {
-        this.processingItems.add(procItem);
-    }
-    
-    /**
-     * Gets processing items in the topology (including entrance processing 
items)
-     * 
-     * @return the set of processing items
-     */
-    public Set<IProcessingItem> getProcessingItems() {
-       return this.processingItems;
-    }
+  /**
+   * Sets the name of this topology
+   * 
+   * @param topologyName
+   *          name of the topology
+   */
+  public void setTopologyName(String topologyName) {
+    this.topoName = topologyName;
+  }
 
-    /**
-     * Add stream to topology.
-     * 
-     * @param stream
-     */
-    public void addStream(Stream stream) {
-        this.streams.add(stream);
-    }
-    
-    /**
-     * Gets streams in the topology
-     * 
-     * @return the set of streams
-     */
-    public Set<Stream> getStreams() {
-       return this.streams;
-    } 
+  /**
+   * Adds an Entrance processing item to the topology.
+   * 
+   * @param epi
+   *          Entrance processing item
+   */
+  public void addEntranceProcessingItem(EntranceProcessingItem epi) {
+    this.entranceProcessingItems.add(epi);
+    this.addProcessingItem(epi);
+  }
+
+  /**
+   * Gets entrance processing items in the topology
+   * 
+   * @return the set of processing items
+   */
+  public Set<EntranceProcessingItem> getEntranceProcessingItems() {
+    return this.entranceProcessingItems;
+  }
+
+  /**
+   * Add processing item to topology.
+   * 
+   * @param procItem
+   *          Processing item.
+   */
+  public void addProcessingItem(IProcessingItem procItem) {
+    addProcessingItem(procItem, 1);
+  }
+
+  /**
+   * Add processing item to topology.
+   * 
+   * @param procItem
+   *          Processing item.
+   * @param parallelismHint
+   *          Processing item parallelism level.
+   */
+  public void addProcessingItem(IProcessingItem procItem, int parallelismHint) 
{
+    this.processingItems.add(procItem);
+  }
+
+  /**
+   * Gets processing items in the topology (including entrance processing 
items)
+   * 
+   * @return the set of processing items
+   */
+  public Set<IProcessingItem> getProcessingItems() {
+    return this.processingItems;
+  }
+
+  /**
+   * Add stream to topology.
+   * 
+   * @param stream
+   */
+  public void addStream(Stream stream) {
+    this.streams.add(stream);
+  }
+
+  /**
+   * Gets streams in the topology
+   * 
+   * @return the set of streams
+   */
+  public Set<Stream> getStreams() {
+    return this.streams;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
index 433f516..f1e82dc 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
@@ -28,51 +28,55 @@ import com.yahoo.labs.samoa.core.Processor;
  */
 public interface ComponentFactory {
 
-    /**
-     * Creates a platform specific processing item with the specified 
processor.
-     * 
-     * @param processor
-     *            contains the logic for this processing item.
-     * @return ProcessingItem
-     */
-    public ProcessingItem createPi(Processor processor);
+  /**
+   * Creates a platform specific processing item with the specified processor.
+   * 
+   * @param processor
+   *          contains the logic for this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem createPi(Processor processor);
 
-    /**
-     * Creates a platform specific processing item with the specified 
processor. Additionally sets the parallelism level.
-     * 
-     * @param processor
-     *            contains the logic for this processing item.
-     * @param parallelism
-     *            defines the amount of instances of this processing item will 
be created.
-     * @return ProcessingItem
-     */
-    public ProcessingItem createPi(Processor processor, int parallelism);
+  /**
+   * Creates a platform specific processing item with the specified processor.
+   * Additionally sets the parallelism level.
+   * 
+   * @param processor
+   *          contains the logic for this processing item.
+   * @param parallelism
+   *          defines the amount of instances of this processing item will be
+   *          created.
+   * @return ProcessingItem
+   */
+  public ProcessingItem createPi(Processor processor, int parallelism);
 
-    /**
-     * Creates a platform specific processing item with the specified 
processor that is the entrance point in the topology. This processing item can 
either
-     * generate a stream of data or connect to an external stream of data.
-     * 
-     * @param entranceProcessor
-     *            contains the logic for this processing item.
-     * @return EntranceProcessingItem
-     */
-    public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor);
+  /**
+   * Creates a platform specific processing item with the specified processor
+   * that is the entrance point in the topology. This processing item can 
either
+   * generate a stream of data or connect to an external stream of data.
+   * 
+   * @param entranceProcessor
+   *          contains the logic for this processing item.
+   * @return EntranceProcessingItem
+   */
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor);
 
-    /**
-     * Creates a platform specific stream.
-     * 
-     * @param sourcePi
-     *            source processing item which will provide the events for 
this stream.
-     * @return Stream
-     */
-    public Stream createStream(IProcessingItem sourcePi);
+  /**
+   * Creates a platform specific stream.
+   * 
+   * @param sourcePi
+   *          source processing item which will provide the events for this
+   *          stream.
+   * @return Stream
+   */
+  public Stream createStream(IProcessingItem sourcePi);
 
-    /**
-     * Creates a platform specific topology.
-     * 
-     * @param topoName
-     *            Topology name.
-     * @return Topology
-     */
-    public Topology createTopology(String topoName);
+  /**
+   * Creates a platform specific topology.
+   * 
+   * @param topoName
+   *          Topology name.
+   * @return Topology
+   */
+  public Topology createTopology(String topoName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
index 32ed109..9698cc3 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
@@ -27,20 +27,21 @@ import com.yahoo.labs.samoa.core.EntranceProcessor;
  */
 public interface EntranceProcessingItem extends IProcessingItem {
 
-    @Override
-    /**
-     * Gets the processing item processor.
-     * 
-     * @return the embedded EntranceProcessor. 
-     */
-    public EntranceProcessor getProcessor();
+  @Override
+  /**
+   * Gets the processing item processor.
+   * 
+   * @return the embedded EntranceProcessor. 
+   */
+  public EntranceProcessor getProcessor();
 
-    /**
-     * Set the single output stream for this EntranceProcessingItem.
-     * 
-     * @param stream
-     *            the stream
-     * @return the current instance of the EntranceProcessingItem for fluent 
interface.
-     */
-    public EntranceProcessingItem setOutputStream(Stream stream);
+  /**
+   * Set the single output stream for this EntranceProcessingItem.
+   * 
+   * @param stream
+   *          the stream
+   * @return the current instance of the EntranceProcessingItem for fluent
+   *         interface.
+   */
+  public EntranceProcessingItem setOutputStream(Stream stream);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
index 7a70dc4..97ea9a4 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
@@ -26,22 +26,22 @@ import com.yahoo.labs.samoa.core.Processor;
  * ProcessingItem interface specific for entrance processing items.
  * 
  * @author severien
- *
+ * 
  */
 public interface IProcessingItem {
-       
-       /**
-        * Gets the processing item processor.
-        * 
-        * @return Processor
-        */
-       public Processor getProcessor();
-       
-       /**
-        * Sets processing item name.
-        * 
-        * @param name
-        */
-       //public void setName(String name);
+
+  /**
+   * Gets the processing item processor.
+   * 
+   * @return Processor
+   */
+  public Processor getProcessor();
+
+  /**
+   * Sets processing item name.
+   * 
+   * @param name
+   */
+  // public void setName(String name);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
index 8499f80..10c0690 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
@@ -23,24 +23,25 @@ package com.yahoo.labs.samoa.topology;
 import com.yahoo.labs.samoa.tasks.Task;
 
 /**
- * Submitter interface for programatically deploying platform specific 
topologies.
+ * Submitter interface for programatically deploying platform specific
+ * topologies.
  * 
  * @author severien
- *
+ * 
  */
 public interface ISubmitter {
 
-       /**
-        * Deploy a specific task to a platform.
-        * 
-        * @param task
-        */
-       public void deployTask(Task task);
-       
-       /**
-        * Sets if the task should run locally or distributed.
-        * 
-        * @param bool
-        */
-       public void setLocal(boolean bool);
+  /**
+   * Deploy a specific task to a platform.
+   * 
+   * @param task
+   */
+  public void deployTask(Task task);
+
+  /**
+   * Sets if the task should run locally or distributed.
+   * 
+   * @param bool
+   */
+  public void setLocal(boolean bool);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
index 2e8758f..1fe4642 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
@@ -24,62 +24,63 @@ import com.yahoo.labs.samoa.core.ContentEvent;
 import com.yahoo.labs.samoa.core.EntranceProcessor;
 
 /**
- * Implementation of EntranceProcessingItem for local engines (Simple, 
Multithreads)
+ * Implementation of EntranceProcessingItem for local engines (Simple,
+ * Multithreads)
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class LocalEntranceProcessingItem extends 
AbstractEntranceProcessingItem {
-       public LocalEntranceProcessingItem(EntranceProcessor processor) {
-               super(processor);
-       }
-       
-       /**
-        * If there are available events, first event in the queue will be
-        * sent out on the output stream. 
-        * @return true if there is (at least) one available event and it was 
sent out
-        *         false otherwise 
-        */
-       public boolean injectNextEvent() {
-               if (this.getProcessor().hasNext()) {
-                       ContentEvent event = this.getProcessor().nextEvent();
-                       this.getOutputStream().put(event);
-                       return true;
-               }
-               return false;
-       }
+  public LocalEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  /**
+   * If there are available events, first event in the queue will be sent out 
on
+   * the output stream.
+   * 
+   * @return true if there is (at least) one available event and it was sent 
out
+   *         false otherwise
+   */
+  public boolean injectNextEvent() {
+    if (this.getProcessor().hasNext()) {
+      ContentEvent event = this.getProcessor().nextEvent();
+      this.getOutputStream().put(event);
+      return true;
+    }
+    return false;
+  }
 
-       /**
-        * Start sending events by calling {@link #injectNextEvent()}. If there 
are no available events, 
-        * and that the stream is not entirely consumed, it will wait by calling
-     * {@link #waitForNewEvents()} before attempting to send again.
-     * </p>
-     * When the stream is entirely consumed, the last event is tagged 
accordingly and the processor gets the
-     * finished status.
-     *
-        */
-       public void startSendingEvents () {
-               if (this.getOutputStream() == null) 
-                       throw new IllegalStateException("Try sending events 
from EntrancePI while outputStream is not set.");
-               
-               while (!this.getProcessor().isFinished()) {
-            if (!this.injectNextEvent()) {
-                try {
-                    waitForNewEvents();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    break;
-                }
-            }
+  /**
+   * Start sending events by calling {@link #injectNextEvent()}. If there are 
no
+   * available events, and that the stream is not entirely consumed, it will
+   * wait by calling {@link #waitForNewEvents()} before attempting to send
+   * again. </p> When the stream is entirely consumed, the last event is tagged
+   * accordingly and the processor gets the finished status.
+   * 
+   */
+  public void startSendingEvents() {
+    if (this.getOutputStream() == null)
+      throw new IllegalStateException("Try sending events from EntrancePI 
while outputStream is not set.");
+
+    while (!this.getProcessor().isFinished()) {
+      if (!this.injectNextEvent()) {
+        try {
+          waitForNewEvents();
+        } catch (Exception e) {
+          e.printStackTrace();
+          break;
         }
-       }
-       
-       /**
-        * Method to wait for an amount of time when there are no available 
events.
-        * Implementation of EntranceProcessingItem should override this method 
to 
-        * implement non-blocking wait or to adjust the amount of time.
-        */
-       protected void waitForNewEvents() throws Exception {
-               Thread.sleep(100);
-       }
+      }
+    }
+  }
+
+  /**
+   * Method to wait for an amount of time when there are no available events.
+   * Implementation of EntranceProcessingItem should override this method to
+   * implement non-blocking wait or to adjust the amount of time.
+   */
+  protected void waitForNewEvents() throws Exception {
+    Thread.sleep(100);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
index 02fb84d..c1ecabc 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
@@ -28,43 +28,42 @@ package com.yahoo.labs.samoa.topology;
  */
 public interface ProcessingItem extends IProcessingItem {
 
-       /**
-        * Connects this processing item in a round robin fashion. The events 
will
-        * be distributed evenly between the instantiated processing items.
-        * 
-        * @param inputStream
-        *            Stream to connect this processing item.
-        * @return ProcessingItem
-        */
-       public ProcessingItem connectInputShuffleStream(Stream inputStream);
+  /**
+   * Connects this processing item in a round robin fashion. The events will be
+   * distributed evenly between the instantiated processing items.
+   * 
+   * @param inputStream
+   *          Stream to connect this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem connectInputShuffleStream(Stream inputStream);
 
-       /**
-        * Connects this processing item taking the event key into account. 
Events
-        * will be routed to the processing item according to the modulus of 
its key
-        * and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 
= 1.
-        * Processing item responsible for 1 will receive this event.
-        * 
-        * @param inputStream
-        *            Stream to connect this processing item.
-        * @return ProcessingItem
-        */
-       public ProcessingItem connectInputKeyStream(Stream inputStream);
+  /**
+   * Connects this processing item taking the event key into account. Events
+   * will be routed to the processing item according to the modulus of its key
+   * and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 = 1.
+   * Processing item responsible for 1 will receive this event.
+   * 
+   * @param inputStream
+   *          Stream to connect this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem connectInputKeyStream(Stream inputStream);
 
-       /**
-        * Connects this processing item to the stream in a broadcast fashion. 
All
-        * processing items of this type will receive copy of the original 
event.
-        * 
-        * @param inputStream
-        *            Stream to connect this processing item.
-        * @return ProcessingItem
-        */
-       public ProcessingItem connectInputAllStream(Stream inputStream);
+  /**
+   * Connects this processing item to the stream in a broadcast fashion. All
+   * processing items of this type will receive copy of the original event.
+   * 
+   * @param inputStream
+   *          Stream to connect this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem connectInputAllStream(Stream inputStream);
 
-
-       /**
-        * Gets processing item parallelism level.
-        * 
-        * @return int
-        */
-       public int getParallelism();
+  /**
+   * Gets processing item parallelism level.
+   * 
+   * @return int
+   */
+  public int getParallelism();
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
index b496d35..0c54232 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
@@ -24,39 +24,38 @@ import com.yahoo.labs.samoa.core.ContentEvent;
 
 /**
  * Stream interface.
- *
+ * 
  * @author severien
- *
+ * 
  */
 public interface Stream {
-       
-       /**
-        * Puts events into a platform specific data stream.
-        * 
-        * @param event
-        */
-       public void put(ContentEvent event);
-       
-       /**
-        * Sets the stream id which is represented by a name.
-        * 
-        * @param stream
-        */
-       //public void setStreamId(String stream);
-       
-       
-       /**
-        * Gets stream id.
-        * 
-        * @return id
-        */
-       public String getStreamId();
-       
-       /**
-        * Set batch size
-        *
-        * @param batchSize
-        *                  the suggested size for batching messages on this 
stream
-        */
-       public void setBatchSize(int batchsize);
+
+  /**
+   * Puts events into a platform specific data stream.
+   * 
+   * @param event
+   */
+  public void put(ContentEvent event);
+
+  /**
+   * Sets the stream id which is represented by a name.
+   * 
+   * @param stream
+   */
+  // public void setStreamId(String stream);
+
+  /**
+   * Gets stream id.
+   * 
+   * @return id
+   */
+  public String getStreamId();
+
+  /**
+   * Set batch size
+   * 
+   * @param batchSize
+   *          the suggested size for batching messages on this stream
+   */
+  public void setBatchSize(int batchsize);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
index 6ad93ed..dce4974 100755
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
@@ -21,65 +21,63 @@ package com.yahoo.labs.samoa.topology;
  */
 
 public interface Topology {
-       /*
-        * Name
-        */
-       /**
-     * Get the topology's name
-     * 
-     * @return the name of the topology
-     */
-       public String getTopologyName();
+  /*
+   * Name
+   */
+  /**
+   * Get the topology's name
+   * 
+   * @return the name of the topology
+   */
+  public String getTopologyName();
 
-       /**
-        * Set the topology's name
-        * 
-        * @param topologyName
-        *                      the name of the topology
-        */
-       public void setTopologyName(String topologyName) ;
+  /**
+   * Set the topology's name
+   * 
+   * @param topologyName
+   *          the name of the topology
+   */
+  public void setTopologyName(String topologyName);
 
-       /*
-        * Entrance Processing Items
-        */
-       /**
-        * Add an EntranceProcessingItem to this topology
-        * 
-        * @param epi
-        *                      the EntranceProcessingItem to be added
-        */
-       void addEntranceProcessingItem(EntranceProcessingItem epi);
-       
-       
-       /*
-        * Processing Items
-        */
-       /**
-        * Add a ProcessingItem to this topology
-        * with default parallelism level (i.e. 1)
-        * 
-        * @param procItem
-        *                      the ProcessingItem to be added
-        */
-       void addProcessingItem(IProcessingItem procItem);
-       
-       /**
-        * Add a ProcessingItem to this topology 
-        * with an associated parallelism level
-        * 
-        * @param procItem
-        *                      the ProcessingItem to be added
-        * @param parallelismHint
-        *                      the parallelism level 
-        */
-       void addProcessingItem(IProcessingItem procItem, int parallelismHint);
-       
-       /*
-        * Streams
-        */
-       /**
-        * 
-        * @param stream
-        */
-       void addStream(Stream stream);
+  /*
+   * Entrance Processing Items
+   */
+  /**
+   * Add an EntranceProcessingItem to this topology
+   * 
+   * @param epi
+   *          the EntranceProcessingItem to be added
+   */
+  void addEntranceProcessingItem(EntranceProcessingItem epi);
+
+  /*
+   * Processing Items
+   */
+  /**
+   * Add a ProcessingItem to this topology with default parallelism level (i.e.
+   * 1)
+   * 
+   * @param procItem
+   *          the ProcessingItem to be added
+   */
+  void addProcessingItem(IProcessingItem procItem);
+
+  /**
+   * Add a ProcessingItem to this topology with an associated parallelism level
+   * 
+   * @param procItem
+   *          the ProcessingItem to be added
+   * @param parallelismHint
+   *          the parallelism level
+   */
+  void addProcessingItem(IProcessingItem procItem, int parallelismHint);
+
+  /*
+   * Streams
+   */
+  /**
+   * 
+   * @param stream
+   */
+  void addStream(Stream stream);
 }

Reply via email to