http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java
index aebb136..90b50ea 100755
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java
@@ -32,192 +32,199 @@ import com.yahoo.labs.samoa.core.Processor;
  */
 public class TopologyBuilder {
 
-    // TODO:
-    // Possible options:
-    // 1. we may convert this as interface and platform dependent builder will 
inherit this method
-    // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology
-    // -ve -> fat class where it has capabilities to instantiate specific 
component and connecting them
-    // +ve -> easy abstraction for SAMOA developer "you just implement your 
builder logic here!"
-    private ComponentFactory componentFactory;
-    private Topology topology;
-    private Map<Processor, IProcessingItem> mapProcessorToProcessingItem;
-
-    // TODO: refactor, temporary constructor used by Storm code
-    public TopologyBuilder() {
-        // TODO: initialize _componentFactory using dynamic binding
-        // for now, use StormComponentFactory
-        // should the factory be Singleton (?)
-        // ans: at the moment, no, i.e. each builder will has its associated 
factory!
-        // and the factory will be instantiated using dynamic binding
-        // this.componentFactory = new StormComponentFactory();
-    }
-
-    // TODO: refactor, temporary constructor used by S4 code
-    public TopologyBuilder(ComponentFactory theFactory) {
-        this.componentFactory = theFactory;
-    }
-
-    /**
-     * Initiates topology with a specific name.
-     * 
-     * @param topologyName
-     */
-    public void initTopology(String topologyName) {
-       this.initTopology(topologyName, 0);
-    }
-    
-    /**
-     * Initiates topology with a specific name and a delay between consecutive 
instances.
-     * 
-     * @param topologyName
-     * @param delay
-     *                         delay between injections of two instances from 
source (in milliseconds)
-     */
-    public void initTopology(String topologyName, int delay) {
-        if (this.topology != null) {
-            // TODO: possible refactor this code later
-            System.out.println("Topology has been initialized before!");
-            return;
-        }
-        this.topology = componentFactory.createTopology(topologyName);
-    }
-
-    /**
-     * Returns the platform specific topology.
-     * 
-     * @return
-     */
-    public Topology build() {
-        return topology;
-    }
-
-    public ProcessingItem addProcessor(Processor processor, int parallelism) {
-        ProcessingItem pi = createPi(processor, parallelism);
-        if (this.mapProcessorToProcessingItem == null)
-            this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
-        this.mapProcessorToProcessingItem.put(processor, pi);
-        return pi;
-    }
-
-    public ProcessingItem addProcessor(Processor processor) {
-        return addProcessor(processor, 1);
-    }
-
-    public ProcessingItem connectInputShuffleStream(Stream inputStream, 
Processor processor) {
-        ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-        Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-        return pi.connectInputShuffleStream(inputStream);
-    }
-
-    public ProcessingItem connectInputKeyStream(Stream inputStream, Processor 
processor) {
-        ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-        Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-        return pi.connectInputKeyStream(inputStream);
-    }
-
-    public ProcessingItem connectInputAllStream(Stream inputStream, Processor 
processor) {
-        ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-        Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-        return pi.connectInputAllStream(inputStream);
-    }
-
-    public Stream createInputShuffleStream(Processor processor, Processor 
dest) {
-        Stream inputStream = this.createStream(dest);
-        ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-        Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-        pi.connectInputShuffleStream(inputStream);
-        return inputStream;
-    }
-
-    public Stream createInputKeyStream(Processor processor, Processor dest) {
-        Stream inputStream = this.createStream(dest);
-        ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-        Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-        pi.connectInputKeyStream(inputStream);
-        return inputStream;
-    }
-
-    public Stream createInputAllStream(Processor processor, Processor dest) {
-        Stream inputStream = this.createStream(dest);
-        ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-        Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-        pi.connectInputAllStream(inputStream);
-        return inputStream;
-    }
-
-    public Stream createStream(Processor processor) {
-        IProcessingItem pi = mapProcessorToProcessingItem.get(processor);
-        Stream ret = null;
-        Preconditions.checkNotNull(pi, "Trying to create stream from null PI");
-        ret = this.createStream(pi);
-        if (pi instanceof EntranceProcessingItem)
-            ((EntranceProcessingItem) pi).setOutputStream(ret);
-        return ret;
-    }
-
-    public EntranceProcessingItem addEntranceProcessor(EntranceProcessor 
entranceProcessor) {
-        EntranceProcessingItem pi = createEntrancePi(entranceProcessor);
-        if (this.mapProcessorToProcessingItem == null)
-            this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
-        mapProcessorToProcessingItem.put(entranceProcessor, pi);
-        return pi;
-    }
-
-    public ProcessingItem getProcessingItem(Processor processor) {
-        ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-        Preconditions.checkNotNull(pi, "Trying to retrieve null PI");
-        return pi;
-    }
-
-    /**
-     * Creates a processing item with a specific processor and paralellism 
level of 1.
-     * 
-     * @param processor
-     * @return ProcessingItem
-     */
-    @SuppressWarnings("unused")
-    private ProcessingItem createPi(Processor processor) {
-        return createPi(processor, 1);
-    }
-
-    /**
-     * Creates a processing item with a specific processor and paralellism 
level.
-     * 
-     * @param processor
-     * @param parallelism
-     * @return ProcessingItem
-     */
-    private ProcessingItem createPi(Processor processor, int parallelism) {
-        ProcessingItem pi = this.componentFactory.createPi(processor, 
parallelism);
-        this.topology.addProcessingItem(pi, parallelism);
-        return pi;
-    }
-
-    /**
-     * Creates a platform specific entrance processing item.
-     * 
-     * @param processor
-     * @return
-     */
-    private EntranceProcessingItem createEntrancePi(EntranceProcessor 
processor) {
-        EntranceProcessingItem epi = 
this.componentFactory.createEntrancePi(processor);
-        this.topology.addEntranceProcessingItem(epi);
-        if (this.mapProcessorToProcessingItem == null)
-            this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
-        this.mapProcessorToProcessingItem.put(processor, epi);
-        return epi;
-    }
-
-    /**
-     * Creates a platform specific stream.
-     * 
-     * @param sourcePi
-     *            source processing item.
-     * @return
-     */
-    private Stream createStream(IProcessingItem sourcePi) {
-        Stream stream = this.componentFactory.createStream(sourcePi);
-        this.topology.addStream(stream);
-        return stream;
-    }
+  // TODO:
+  // Possible options:
+  // 1. we may convert this as interface and platform dependent builder will
+  // inherit this method
+  // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology
+  // -ve -> fat class where it has capabilities to instantiate specific
+  // component and connecting them
+  // +ve -> easy abstraction for SAMOA developer
+  // "you just implement your builder logic here!"
+  private ComponentFactory componentFactory;
+  private Topology topology;
+  private Map<Processor, IProcessingItem> mapProcessorToProcessingItem;
+
+  // TODO: refactor, temporary constructor used by Storm code
+  public TopologyBuilder() {
+    // TODO: initialize _componentFactory using dynamic binding
+    // for now, use StormComponentFactory
+    // should the factory be Singleton (?)
+    // ans: at the moment, no, i.e. each builder will has its associated
+    // factory!
+    // and the factory will be instantiated using dynamic binding
+    // this.componentFactory = new StormComponentFactory();
+  }
+
+  // TODO: refactor, temporary constructor used by S4 code
+  public TopologyBuilder(ComponentFactory theFactory) {
+    this.componentFactory = theFactory;
+  }
+
+  /**
+   * Initiates topology with a specific name.
+   * 
+   * @param topologyName
+   */
+  public void initTopology(String topologyName) {
+    this.initTopology(topologyName, 0);
+  }
+
+  /**
+   * Initiates topology with a specific name and a delay between consecutive
+   * instances.
+   * 
+   * @param topologyName
+   * @param delay
+   *          delay between injections of two instances from source (in
+   *          milliseconds)
+   */
+  public void initTopology(String topologyName, int delay) {
+    if (this.topology != null) {
+      // TODO: possible refactor this code later
+      System.out.println("Topology has been initialized before!");
+      return;
+    }
+    this.topology = componentFactory.createTopology(topologyName);
+  }
+
+  /**
+   * Returns the platform specific topology.
+   * 
+   * @return
+   */
+  public Topology build() {
+    return topology;
+  }
+
+  public ProcessingItem addProcessor(Processor processor, int parallelism) {
+    ProcessingItem pi = createPi(processor, parallelism);
+    if (this.mapProcessorToProcessingItem == null)
+      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
+    this.mapProcessorToProcessingItem.put(processor, pi);
+    return pi;
+  }
+
+  public ProcessingItem addProcessor(Processor processor) {
+    return addProcessor(processor, 1);
+  }
+
+  public ProcessingItem connectInputShuffleStream(Stream inputStream, 
Processor processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    return pi.connectInputShuffleStream(inputStream);
+  }
+
+  public ProcessingItem connectInputKeyStream(Stream inputStream, Processor 
processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    return pi.connectInputKeyStream(inputStream);
+  }
+
+  public ProcessingItem connectInputAllStream(Stream inputStream, Processor 
processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    return pi.connectInputAllStream(inputStream);
+  }
+
+  public Stream createInputShuffleStream(Processor processor, Processor dest) {
+    Stream inputStream = this.createStream(dest);
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    pi.connectInputShuffleStream(inputStream);
+    return inputStream;
+  }
+
+  public Stream createInputKeyStream(Processor processor, Processor dest) {
+    Stream inputStream = this.createStream(dest);
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    pi.connectInputKeyStream(inputStream);
+    return inputStream;
+  }
+
+  public Stream createInputAllStream(Processor processor, Processor dest) {
+    Stream inputStream = this.createStream(dest);
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    pi.connectInputAllStream(inputStream);
+    return inputStream;
+  }
+
+  public Stream createStream(Processor processor) {
+    IProcessingItem pi = mapProcessorToProcessingItem.get(processor);
+    Stream ret = null;
+    Preconditions.checkNotNull(pi, "Trying to create stream from null PI");
+    ret = this.createStream(pi);
+    if (pi instanceof EntranceProcessingItem)
+      ((EntranceProcessingItem) pi).setOutputStream(ret);
+    return ret;
+  }
+
+  public EntranceProcessingItem addEntranceProcessor(EntranceProcessor 
entranceProcessor) {
+    EntranceProcessingItem pi = createEntrancePi(entranceProcessor);
+    if (this.mapProcessorToProcessingItem == null)
+      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
+    mapProcessorToProcessingItem.put(entranceProcessor, pi);
+    return pi;
+  }
+
+  public ProcessingItem getProcessingItem(Processor processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to retrieve null PI");
+    return pi;
+  }
+
+  /**
+   * Creates a processing item with a specific processor and paralellism level
+   * of 1.
+   * 
+   * @param processor
+   * @return ProcessingItem
+   */
+  @SuppressWarnings("unused")
+  private ProcessingItem createPi(Processor processor) {
+    return createPi(processor, 1);
+  }
+
+  /**
+   * Creates a processing item with a specific processor and paralellism level.
+   * 
+   * @param processor
+   * @param parallelism
+   * @return ProcessingItem
+   */
+  private ProcessingItem createPi(Processor processor, int parallelism) {
+    ProcessingItem pi = this.componentFactory.createPi(processor, parallelism);
+    this.topology.addProcessingItem(pi, parallelism);
+    return pi;
+  }
+
+  /**
+   * Creates a platform specific entrance processing item.
+   * 
+   * @param processor
+   * @return
+   */
+  private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) 
{
+    EntranceProcessingItem epi = 
this.componentFactory.createEntrancePi(processor);
+    this.topology.addEntranceProcessingItem(epi);
+    if (this.mapProcessorToProcessingItem == null)
+      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
+    this.mapProcessorToProcessingItem.put(processor, epi);
+    return epi;
+  }
+
+  /**
+   * Creates a platform specific stream.
+   * 
+   * @param sourcePi
+   *          source processing item.
+   * @return
+   */
+  private Stream createStream(IProcessingItem sourcePi) {
+    Stream stream = this.componentFactory.createStream(sourcePi);
+    this.topology.addStream(stream);
+    return stream;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java
index ac6fc3f..457e407 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java
@@ -22,11 +22,12 @@ package com.yahoo.labs.samoa.utils;
 
 /**
  * Represents the 3 schemes to partition the streams
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public enum PartitioningScheme {
-       SHUFFLE, GROUP_BY_KEY, BROADCAST
+  SHUFFLE, GROUP_BY_KEY, BROADCAST
 }
 // TODO: use this enum in S4
 // Storm doesn't seem to need this
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java
index 2781fb8..a759b26 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java
@@ -23,43 +23,42 @@ package com.yahoo.labs.samoa.utils;
 import com.yahoo.labs.samoa.topology.IProcessingItem;
 
 /**
- * Represents one destination for streams. It has the info of:
- * the ProcessingItem, parallelismHint, and partitioning scheme.
- * Usage:
- * - When ProcessingItem connects to a stream, it will pass 
- * a StreamDestination to the stream.
- * - Stream manages a set of StreamDestination.
- * - Used in single-threaded and multi-threaded local mode.
+ * Represents one destination for streams. It has the info of: the
+ * ProcessingItem, parallelismHint, and partitioning scheme. Usage: - When
+ * ProcessingItem connects to a stream, it will pass a StreamDestination to the
+ * stream. - Stream manages a set of StreamDestination. - Used in
+ * single-threaded and multi-threaded local mode.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class StreamDestination {
-       private IProcessingItem pi;
-       private int parallelism;
-       private PartitioningScheme type;
-       
-       /*
-        * Constructor
-        */
-       public StreamDestination(IProcessingItem pi, int parallelismHint, 
PartitioningScheme type) {
-               this.pi = pi;
-               this.parallelism = parallelismHint;
-               this.type = type;
-       }
-       
-       /*
-        * Getters
-        */
-       public IProcessingItem getProcessingItem() {
-               return this.pi;
-       }
-       
-       public int getParallelism() {
-               return this.parallelism;
-       }
-       
-       public PartitioningScheme getPartitioningScheme() {
-               return this.type;
-       }
+  private IProcessingItem pi;
+  private int parallelism;
+  private PartitioningScheme type;
+
+  /*
+   * Constructor
+   */
+  public StreamDestination(IProcessingItem pi, int parallelismHint, 
PartitioningScheme type) {
+    this.pi = pi;
+    this.parallelism = parallelismHint;
+    this.type = type;
+  }
+
+  /*
+   * Getters
+   */
+  public IProcessingItem getProcessingItem() {
+    return this.pi;
+  }
+
+  public int getParallelism() {
+    return this.parallelism;
+  }
+
+  public PartitioningScheme getPartitioningScheme() {
+    return this.type;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java
index ce18d78..bd819ad 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java
@@ -35,148 +35,150 @@ import java.util.zip.ZipEntry;
 
 /**
  * Utils class for building and deploying applications programmatically.
+ * 
  * @author severien
- *
+ * 
  */
 public class Utils {
 
-       public static void buildSamoaPackage() {
-               try {
-                       String output = "/tmp/samoa/samoa.jar";// 
System.getProperty("user.home") + "/samoa.jar";
-                       Manifest manifest = createManifest();
-
-                       BufferedOutputStream bo;
-
-                       bo = new BufferedOutputStream(new 
FileOutputStream(output));
-                       JarOutputStream jo = new JarOutputStream(bo, manifest);
-
-                       String baseDir = System.getProperty("user.dir");
-                       System.out.println(baseDir);
-                       
-                       File samoaJar = new 
File(baseDir+"/target/samoa-0.0.1-SNAPSHOT.jar");
-                       addEntry(jo,samoaJar,baseDir+"/target/","/app/");
-                       addLibraries(jo);
-                       
-                       jo.close();
-                       bo.close();
-               } catch (IOException e) {
-                       e.printStackTrace();
-               }
-
-       }
-       
-       // TODO should get the modules file from the parameters
-       public static void buildModulesPackage(List<String> modulesNames) {
-               System.out.println(System.getProperty("user.dir"));
-               try {
-                       String baseDir = System.getProperty("user.dir");
-                       List<File> filesArray = new ArrayList<>();
-                       for (String module : modulesNames) {
-                               module = "/"+module.replace(".", "/")+".class";
-                               filesArray.add(new File(baseDir+module));
-                       }
-                       String output = System.getProperty("user.home") + 
"/modules.jar";
-
-                       Manifest manifest = new Manifest();
-                       
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,
-                                       "1.0");
-                       
manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL,
-                                       "http://samoa.yahoo.com";);
-                       manifest.getMainAttributes().put(
-                                       Attributes.Name.IMPLEMENTATION_VERSION, 
"0.1");
-                       
manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR,
-                                       "Yahoo");
-                       manifest.getMainAttributes().put(
-                                       
Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA");
-
-                       BufferedOutputStream bo;
-
-                       bo = new BufferedOutputStream(new 
FileOutputStream(output));
-                       JarOutputStream jo = new JarOutputStream(bo, manifest);
-
-                       File[] files = filesArray.toArray(new 
File[filesArray.size()]);
-                       addEntries(jo,files, baseDir, "");
-
-                       jo.close();
-                       bo.close();
-               } catch (IOException e) {
-                       e.printStackTrace();
-               }
-
-       }
-
-       private static void addLibraries(JarOutputStream jo) {
-               try {
-                       String baseDir = System.getProperty("user.dir");
-                       String libDir = baseDir+"/target/lib";
-                       File inputFile = new File(libDir);
-                       
-                       File[] files = inputFile.listFiles();
-                       for (File file : files) {
-                               addEntry(jo, file, baseDir, "lib");
-                       }
-                       jo.close();
-                       
-               } catch (IOException e) {
-                       e.printStackTrace();
-               }
-       }
-       
-       private static void addEntries(JarOutputStream jo, File[] files, String 
baseDir, String rootDir){
-               for (File file : files) {
-
-                       if (!file.isDirectory()) {
-                               addEntry(jo, file, baseDir, rootDir);
-                       } else {
-                               File dir = new File(file.getAbsolutePath());
-                               addEntries(jo, dir.listFiles(), baseDir, 
rootDir);
-                       }
-               }
-       }
-       
-       private static void addEntry(JarOutputStream jo, File file, String 
baseDir, String rootDir) {
-               try {
-                       BufferedInputStream bi = new BufferedInputStream(new 
FileInputStream(file));
-
-                       String path = 
file.getAbsolutePath().replaceFirst(baseDir, rootDir);
-                       jo.putNextEntry(new ZipEntry(path));
-
-                       byte[] buf = new byte[1024];
-                       int anz;
-                       while ((anz = bi.read(buf)) != -1) {
-                               jo.write(buf, 0, anz);
-                       }
-                       bi.close();
-               } catch (IOException e) {
-                       e.printStackTrace();
-               }
-       }
-
-       public static Manifest createManifest() {
-               Manifest manifest = new Manifest();
-               
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
-               
manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, 
"http://samoa.yahoo.com";);
-               
manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, "0.1");
-               
manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, 
"Yahoo");
-               
manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, 
"SAMOA");
-               Attributes s4Attributes = new Attributes();
-               s4Attributes.putValue("S4-App-Class", "path.to.Class");
-               Attributes.Name name = new Attributes.Name("S4-App-Class");
-               Attributes.Name S4Version = new Attributes.Name("S4-Version");
-               manifest.getMainAttributes().put(name, 
"samoa.topology.impl.DoTaskApp");
-               manifest.getMainAttributes().put(S4Version, "0.6.0-incubating");
-               return manifest;
-       }
-       
-       public static Object getInstance(String className) {
+  public static void buildSamoaPackage() {
+    try {
+      String output = "/tmp/samoa/samoa.jar";// System.getProperty("user.home")
+                                             // + "/samoa.jar";
+      Manifest manifest = createManifest();
+
+      BufferedOutputStream bo;
+
+      bo = new BufferedOutputStream(new FileOutputStream(output));
+      JarOutputStream jo = new JarOutputStream(bo, manifest);
+
+      String baseDir = System.getProperty("user.dir");
+      System.out.println(baseDir);
+
+      File samoaJar = new File(baseDir + "/target/samoa-0.0.1-SNAPSHOT.jar");
+      addEntry(jo, samoaJar, baseDir + "/target/", "/app/");
+      addLibraries(jo);
+
+      jo.close();
+      bo.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  // TODO should get the modules file from the parameters
+  public static void buildModulesPackage(List<String> modulesNames) {
+    System.out.println(System.getProperty("user.dir"));
+    try {
+      String baseDir = System.getProperty("user.dir");
+      List<File> filesArray = new ArrayList<>();
+      for (String module : modulesNames) {
+        module = "/" + module.replace(".", "/") + ".class";
+        filesArray.add(new File(baseDir + module));
+      }
+      String output = System.getProperty("user.home") + "/modules.jar";
+
+      Manifest manifest = new Manifest();
+      manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,
+          "1.0");
+      manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL,
+          "http://samoa.yahoo.com";);
+      manifest.getMainAttributes().put(
+          Attributes.Name.IMPLEMENTATION_VERSION, "0.1");
+      manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR,
+          "Yahoo");
+      manifest.getMainAttributes().put(
+          Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA");
+
+      BufferedOutputStream bo;
+
+      bo = new BufferedOutputStream(new FileOutputStream(output));
+      JarOutputStream jo = new JarOutputStream(bo, manifest);
+
+      File[] files = filesArray.toArray(new File[filesArray.size()]);
+      addEntries(jo, files, baseDir, "");
+
+      jo.close();
+      bo.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  private static void addLibraries(JarOutputStream jo) {
+    try {
+      String baseDir = System.getProperty("user.dir");
+      String libDir = baseDir + "/target/lib";
+      File inputFile = new File(libDir);
+
+      File[] files = inputFile.listFiles();
+      for (File file : files) {
+        addEntry(jo, file, baseDir, "lib");
+      }
+      jo.close();
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static void addEntries(JarOutputStream jo, File[] files, String 
baseDir, String rootDir) {
+    for (File file : files) {
+
+      if (!file.isDirectory()) {
+        addEntry(jo, file, baseDir, rootDir);
+      } else {
+        File dir = new File(file.getAbsolutePath());
+        addEntries(jo, dir.listFiles(), baseDir, rootDir);
+      }
+    }
+  }
+
+  private static void addEntry(JarOutputStream jo, File file, String baseDir, 
String rootDir) {
+    try {
+      BufferedInputStream bi = new BufferedInputStream(new 
FileInputStream(file));
+
+      String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir);
+      jo.putNextEntry(new ZipEntry(path));
+
+      byte[] buf = new byte[1024];
+      int anz;
+      while ((anz = bi.read(buf)) != -1) {
+        jo.write(buf, 0, anz);
+      }
+      bi.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static Manifest createManifest() {
+    Manifest manifest = new Manifest();
+    manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, 
"http://samoa.yahoo.com";);
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, 
"0.1");
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, 
"Yahoo");
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, 
"SAMOA");
+    Attributes s4Attributes = new Attributes();
+    s4Attributes.putValue("S4-App-Class", "path.to.Class");
+    Attributes.Name name = new Attributes.Name("S4-App-Class");
+    Attributes.Name S4Version = new Attributes.Name("S4-Version");
+    manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp");
+    manifest.getMainAttributes().put(S4Version, "0.6.0-incubating");
+    return manifest;
+  }
+
+  public static Object getInstance(String className) {
     Class<?> cls;
-               Object obj = null;
-               try {
-                       cls = Class.forName(className); 
-                       obj = cls.newInstance();
-               } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
-                       e.printStackTrace();
-               }
-               return obj;
-       }
+    Object obj = null;
+    try {
+      cls = Class.forName(className);
+      obj = cls.newInstance();
+    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
+      e.printStackTrace();
+    }
+    return obj;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java
index f82588b..e8d589f 100644
--- a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java
+++ b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java
@@ -27,71 +27,78 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class DoubleVectorTest {
-    private DoubleVector emptyVector, array5Vector;
-
-    @Before
-    public void setUp() {
-        emptyVector = new DoubleVector();
-        array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 });
-    }
-
-    @Test
-    public void testGetArrayRef() {
-        assertThat(emptyVector.getArrayRef(), notNullValue());
-        assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef());
-        assertEquals(5, array5Vector.getArrayRef().length);
-    }
-
-    @Test
-    public void testGetArrayCopy() {
-        double[] arrayRef;
-        arrayRef = emptyVector.getArrayRef();
-        assertTrue(arrayRef != emptyVector.getArrayCopy());
-        assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy())));
-
-        arrayRef = array5Vector.getArrayRef();
-        assertTrue(arrayRef != array5Vector.getArrayCopy());
-        assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy())));
-    }
-
-    @Test
-    public void testNumNonZeroEntries() {
-        assertEquals(0, emptyVector.numNonZeroEntries());
-        assertEquals(3, array5Vector.numNonZeroEntries());
-    }
-
-    @Test(expected = IndexOutOfBoundsException.class)
-    public void testGetValueOutOfBound() {
-        @SuppressWarnings("unused")
-        double value = emptyVector.getArrayRef()[0];
-    }
-
-    @Test()
-    public void testSetValue() {
-        // test automatic vector enlargement
-        emptyVector.setValue(0, 1.0);
-        assertEquals(1, emptyVector.getArrayRef().length);
-        assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be 
exactly the same, so delta=0.0
-
-        emptyVector.setValue(5, 5.5);
-        assertEquals(6, emptyVector.getArrayRef().length);
-        assertEquals(2, emptyVector.numNonZeroEntries());
-        assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be 
exactly the same, so delta=0.0
-    }
-
-    @Test
-    public void testAddToValue() {
-        array5Vector.addToValue(2, 5.0);
-        assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be 
exactly the same, so delta=0.0
-
-        // test automatic vector enlargement
-        emptyVector.addToValue(0, 1.0);
-        assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be 
exactly the same, so delta=0.0
-    }
-
-    @Test
-    public void testSumOfValues() {
-        assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), 
Double.MIN_NORMAL);
-    }
+  private DoubleVector emptyVector, array5Vector;
+
+  @Before
+  public void setUp() {
+    emptyVector = new DoubleVector();
+    array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 });
+  }
+
+  @Test
+  public void testGetArrayRef() {
+    assertThat(emptyVector.getArrayRef(), notNullValue());
+    assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef());
+    assertEquals(5, array5Vector.getArrayRef().length);
+  }
+
+  @Test
+  public void testGetArrayCopy() {
+    double[] arrayRef;
+    arrayRef = emptyVector.getArrayRef();
+    assertTrue(arrayRef != emptyVector.getArrayCopy());
+    assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy())));
+
+    arrayRef = array5Vector.getArrayRef();
+    assertTrue(arrayRef != array5Vector.getArrayCopy());
+    assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy())));
+  }
+
+  @Test
+  public void testNumNonZeroEntries() {
+    assertEquals(0, emptyVector.numNonZeroEntries());
+    assertEquals(3, array5Vector.numNonZeroEntries());
+  }
+
+  @Test(expected = IndexOutOfBoundsException.class)
+  public void testGetValueOutOfBound() {
+    @SuppressWarnings("unused")
+    double value = emptyVector.getArrayRef()[0];
+  }
+
+  @Test()
+  public void testSetValue() {
+    // test automatic vector enlargement
+    emptyVector.setValue(0, 1.0);
+    assertEquals(1, emptyVector.getArrayRef().length);
+    assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly
+                                                          // the same, so
+                                                          // delta=0.0
+
+    emptyVector.setValue(5, 5.5);
+    assertEquals(6, emptyVector.getArrayRef().length);
+    assertEquals(2, emptyVector.numNonZeroEntries());
+    assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly
+                                                          // the same, so
+                                                          // delta=0.0
+  }
+
+  @Test
+  public void testAddToValue() {
+    array5Vector.addToValue(2, 5.0);
+    assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly
+                                                         // the same, so
+                                                         // delta=0.0
+
+    // test automatic vector enlargement
+    emptyVector.addToValue(0, 1.0);
+    assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly 
the
+                                                        // same, so delta=0.0
+  }
+
+  @Test
+  public void testSumOfValues() {
+    assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), 
Double.MIN_NORMAL);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
index 04f3184..51ec57d 100644
--- 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
+++ 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
@@ -47,257 +47,260 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class HDFSFileStreamSourceTest {
-       
-       private static final String[] HOSTS = {"localhost"};
-       private static final String BASE_DIR = "/minidfsTest";
-       private static final int NUM_FILES_IN_DIR = 4;
-       private static final int NUM_NOISE_FILES_IN_DIR = 2;
-       
-       private HDFSFileStreamSource streamSource;
-
-       private Configuration config;
-       private MiniDFSCluster hdfsCluster;
-       private String hdfsURI;
-
-       @Before
-       public void setUp() throws Exception {
-               // Start MiniDFSCluster
-               MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new 
Configuration()).hosts(HOSTS).numDataNodes(1).format(true);
-               hdfsCluster = builder.build();
-               hdfsCluster.waitActive();
-               hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort();
-               
-               // Construct stream source
-               streamSource = new HDFSFileStreamSource();
-               
-               // General config
-               config = new Configuration();
-               config.set("fs.defaultFS",hdfsURI);
-       }
-
-       @After
-       public void tearDown() throws Exception {
-               hdfsCluster.shutdown();
-       }
-
-       /*
-        * Init tests
-        */
-       @Test
-       public void testInitWithSingleFileAndExtension() {
-               // write input file
-               writeSimpleFiles(BASE_DIR,"txt",1);
-               
-               // init with path to input file
-               streamSource.init(config, BASE_DIR+"/1.txt", "txt");
-               
-               //assertions
-               assertEquals("Size of filePaths is not correct.", 
1,streamSource.getFilePathListSize(),0);
-               String fn = streamSource.getFilePathAt(0);
-               assertTrue("Incorrect file in 
filePaths.",fn.equals(BASE_DIR+"/1.txt") || 
fn.equals(hdfsURI+BASE_DIR+"1.txt"));
-       }
-       
-       @Test
-       public void testInitWithSingleFileAndNullExtension() {
-               // write input file
-               writeSimpleFiles(BASE_DIR,"txt",1);
-                       
-               // init with path to input file
-               streamSource.init(config, BASE_DIR+"/1.txt", null);
-               
-               // assertions
-               assertEquals("Size of filePaths is not correct.", 
1,streamSource.getFilePathListSize(),0);
-               String fn = streamSource.getFilePathAt(0);
-               assertTrue("Incorrect file in 
filePaths.",fn.equals(BASE_DIR+"/1.txt") || 
fn.equals(hdfsURI+BASE_DIR+"1.txt"));
-       }
-       
-       @Test
-       public void testInitWithFolderAndExtension() {
-               // write input files & noise files
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-               writeSimpleFiles(BASE_DIR,null,NUM_NOISE_FILES_IN_DIR);
-                       
-               // init with path to input dir
-               streamSource.init(config, BASE_DIR, "txt");
-               
-               // assertions
-               assertEquals("Size of filePaths is not correct.", 
NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0);
-               Set<String> filenames = new HashSet<String>();
-               for (int i=1; i<=NUM_FILES_IN_DIR; i++) {
-                       String targetFn = 
BASE_DIR+"/"+Integer.toString(i)+".txt";
-                       filenames.add(targetFn);
-                       filenames.add(hdfsURI+targetFn);
-               }
-               for (int i=0; i<NUM_FILES_IN_DIR; i++) {
-                       String fn = streamSource.getFilePathAt(i);
-                       assertTrue("Incorrect file in 
filePaths:"+fn,filenames.contains(fn));
-               }
-       }
-       
-       @Test
-       public void testInitWithFolderAndNullExtension() {
-               // write input file
-               writeSimpleFiles(BASE_DIR,null,NUM_FILES_IN_DIR);
-               
-               // init with path to input dir
-               streamSource.init(config, BASE_DIR, null);
-               
-               // assertions
-               assertEquals("Size of filePaths is not correct.", 
NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0);
-               Set<String> filenames = new HashSet<String>();
-               for (int i=1; i<=NUM_FILES_IN_DIR; i++) {
-                       String targetFn = BASE_DIR+"/"+Integer.toString(i);
-                       filenames.add(targetFn);
-                       filenames.add(hdfsURI+targetFn);
-               }
-               for (int i=0; i< NUM_FILES_IN_DIR; i++) {
-                       String fn = streamSource.getFilePathAt(i);
-                       assertTrue("Incorrect file in 
filePaths:"+fn,filenames.contains(fn));
-               }
-       }
-       
-       /*
-        * getNextInputStream tests
-        */
-       @Test
-       public void testGetNextInputStream() {
-               // write input files & noise files
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-                                       
-               // init with path to input dir
-               streamSource.init(config, BASE_DIR, "txt");
-                               
-               // call getNextInputStream & assertions
-               Set<String> contents = new HashSet<String>();
-               for (int i=1; i<=NUM_FILES_IN_DIR; i++) {
-                       contents.add(Integer.toString(i));
-               }
-               for (int i=0; i< NUM_FILES_IN_DIR; i++) {
-                       InputStream inStream = 
streamSource.getNextInputStream();
-                       assertNotNull("Unexpected end of input stream 
list.",inStream);
-                       
-                       BufferedReader rd = new BufferedReader(new 
InputStreamReader(inStream));
-                       String inputRead = null;
-                       try {
-                               inputRead = rd.readLine();
-                       } catch (IOException ioe) {
-                               fail("Fail reading from stream at index:"+i + 
ioe.getMessage());
-                       }
-                       assertTrue("File content is 
incorrect.",contents.contains(inputRead));
-                       Iterator<String> it = contents.iterator();
-                       while (it.hasNext()) {
-                               if (it.next().equals(inputRead)) {
-                                       it.remove();
-                                       break;
-                               }
-                       }
-               }
-               
-               // assert that another call to getNextInputStream will return 
null
-               assertNull("Call getNextInputStream after the last file did not 
return null.",streamSource.getNextInputStream());
-       }
-       
-       /*
-        * getCurrentInputStream tests
-        */
-       public void testGetCurrentInputStream() {
-               // write input files & noise files
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-                                                       
-               // init with path to input dir
-               streamSource.init(config, BASE_DIR, "txt");
-                                               
-               // call getNextInputStream, getCurrentInputStream & assertions
-               for (int i=0; i<= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
-                       InputStream inStream1 = 
streamSource.getNextInputStream();
-                       InputStream inStream2 = 
streamSource.getCurrentInputStream();
-                       assertSame("Incorrect current input stream.",inStream1, 
inStream2);
-               }
-       }
-       
-       /*
-        * reset tests
-        */
-       public void testReset() {
-               // write input files & noise files
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-                                                                       
-               // init with path to input dir
-               streamSource.init(config, BASE_DIR, "txt");
-               
-               // Get the first input string
-               InputStream firstInStream = streamSource.getNextInputStream();
-               String firstInput = null;
-               assertNotNull("Unexpected end of input stream 
list.",firstInStream);
-               
-               BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
-               try {
-                       firstInput = rd1.readLine();
-               } catch (IOException ioe) {
-                       fail("Fail reading from stream at index:0" + 
ioe.getMessage());
-               }
-               
-               // call getNextInputStream a few times
-               streamSource.getNextInputStream();
-               
-               // call reset, call next, assert that output is 1 (the first 
file)
-               try {
-                       streamSource.reset();
-               } catch (IOException ioe) {
-                       fail("Fail resetting stream source." + 
ioe.getMessage());
-               }
-               
-               InputStream inStream = streamSource.getNextInputStream();
-               assertNotNull("Unexpected end of input stream list.",inStream);
-               
-               BufferedReader rd2 = new BufferedReader(new 
InputStreamReader(inStream));
-               String inputRead = null;
-               try {
-                       inputRead = rd2.readLine();
-               } catch (IOException ioe) {
-                       fail("Fail reading from stream at index:0" + 
ioe.getMessage());
-               }
-               assertEquals("File content is incorrect.",firstInput,inputRead);
-       }
-       
-       private void writeSimpleFiles(String path, String ext, int numOfFiles) {
-               // get filesystem
-               FileSystem dfs;
-               try {
-                       dfs = hdfsCluster.getFileSystem();
-               } catch (IOException ioe) {
-                       fail("Could not access MiniDFSCluster" + 
ioe.getMessage());
-                       return;
-               }
-               
-               // create basedir
-               Path basedir = new Path(path);
-               try {
-                       dfs.mkdirs(basedir);
-               } catch (IOException ioe) {
-                       fail("Could not create DIR:"+ path + "\n" + 
ioe.getMessage());
-                       return;
-               }
-               
-               // write files
-               for (int i=1; i<=numOfFiles; i++) {
-                       String fn = null;
-                       if (ext != null) {
-                               fn = Integer.toString(i) + "."+ ext;
-                       } else {
-                               fn = Integer.toString(i);
-                       }
-                       
-                       try {
-                               OutputStream fin = dfs.create(new 
Path(path,fn));
-                               BufferedWriter wr = new BufferedWriter(new 
OutputStreamWriter(fin));
-                               wr.write(Integer.toString(i));
-                               wr.close();
-                               fin.close();
-                       } catch (IOException ioe) {
-                               fail("Fail writing to input file: "+ fn + " in 
directory: " + path + ioe.getMessage());
-                       }
-               }
-       }
+
+  private static final String[] HOSTS = { "localhost" };
+  private static final String BASE_DIR = "/minidfsTest";
+  private static final int NUM_FILES_IN_DIR = 4;
+  private static final int NUM_NOISE_FILES_IN_DIR = 2;
+
+  private HDFSFileStreamSource streamSource;
+
+  private Configuration config;
+  private MiniDFSCluster hdfsCluster;
+  private String hdfsURI;
+
+  @Before
+  public void setUp() throws Exception {
+    // Start MiniDFSCluster
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new 
Configuration()).hosts(HOSTS).numDataNodes(1)
+        .format(true);
+    hdfsCluster = builder.build();
+    hdfsCluster.waitActive();
+    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort();
+
+    // Construct stream source
+    streamSource = new HDFSFileStreamSource();
+
+    // General config
+    config = new Configuration();
+    config.set("fs.defaultFS", hdfsURI);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hdfsCluster.shutdown();
+  }
+
+  /*
+   * Init tests
+   */
+  @Test
+  public void testInitWithSingleFileAndExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    streamSource.init(config, BASE_DIR + "/1.txt", "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertTrue("Incorrect file in filePaths.",
+        fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + 
"1.txt"));
+  }
+
+  @Test
+  public void testInitWithSingleFileAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    streamSource.init(config, BASE_DIR + "/1.txt", null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertTrue("Incorrect file in filePaths.",
+        fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + 
"1.txt"));
+  }
+
+  @Test
+  public void testInitWithFolderAndExtension() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+    writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String targetFn = BASE_DIR + "/" + Integer.toString(i) + ".txt";
+      filenames.add(targetFn);
+      filenames.add(hdfsURI + targetFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  @Test
+  public void testInitWithFolderAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String targetFn = BASE_DIR + "/" + Integer.toString(i);
+      filenames.add(targetFn);
+      filenames.add(hdfsURI + targetFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  /*
+   * getNextInputStream tests
+   */
+  @Test
+  public void testGetNextInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // call getNextInputStream & assertions
+    Set<String> contents = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      contents.add(Integer.toString(i));
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      InputStream inStream = streamSource.getNextInputStream();
+      assertNotNull("Unexpected end of input stream list.", inStream);
+
+      BufferedReader rd = new BufferedReader(new InputStreamReader(inStream));
+      String inputRead = null;
+      try {
+        inputRead = rd.readLine();
+      } catch (IOException ioe) {
+        fail("Fail reading from stream at index:" + i + ioe.getMessage());
+      }
+      assertTrue("File content is incorrect.", contents.contains(inputRead));
+      Iterator<String> it = contents.iterator();
+      while (it.hasNext()) {
+        if (it.next().equals(inputRead)) {
+          it.remove();
+          break;
+        }
+      }
+    }
+
+    // assert that another call to getNextInputStream will return null
+    assertNull("Call getNextInputStream after the last file did not return 
null.", streamSource.getNextInputStream());
+  }
+
+  /*
+   * getCurrentInputStream tests
+   */
+  public void testGetCurrentInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // call getNextInputStream, getCurrentInputStream & assertions
+    for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
+      InputStream inStream1 = streamSource.getNextInputStream();
+      InputStream inStream2 = streamSource.getCurrentInputStream();
+      assertSame("Incorrect current input stream.", inStream1, inStream2);
+    }
+  }
+
+  /*
+   * reset tests
+   */
+  public void testReset() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // Get the first input string
+    InputStream firstInStream = streamSource.getNextInputStream();
+    String firstInput = null;
+    assertNotNull("Unexpected end of input stream list.", firstInStream);
+
+    BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
+    try {
+      firstInput = rd1.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+
+    // call getNextInputStream a few times
+    streamSource.getNextInputStream();
+
+    // call reset, call next, assert that output is 1 (the first file)
+    try {
+      streamSource.reset();
+    } catch (IOException ioe) {
+      fail("Fail resetting stream source." + ioe.getMessage());
+    }
+
+    InputStream inStream = streamSource.getNextInputStream();
+    assertNotNull("Unexpected end of input stream list.", inStream);
+
+    BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream));
+    String inputRead = null;
+    try {
+      inputRead = rd2.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+    assertEquals("File content is incorrect.", firstInput, inputRead);
+  }
+
+  private void writeSimpleFiles(String path, String ext, int numOfFiles) {
+    // get filesystem
+    FileSystem dfs;
+    try {
+      dfs = hdfsCluster.getFileSystem();
+    } catch (IOException ioe) {
+      fail("Could not access MiniDFSCluster" + ioe.getMessage());
+      return;
+    }
+
+    // create basedir
+    Path basedir = new Path(path);
+    try {
+      dfs.mkdirs(basedir);
+    } catch (IOException ioe) {
+      fail("Could not create DIR:" + path + "\n" + ioe.getMessage());
+      return;
+    }
+
+    // write files
+    for (int i = 1; i <= numOfFiles; i++) {
+      String fn = null;
+      if (ext != null) {
+        fn = Integer.toString(i) + "." + ext;
+      } else {
+        fn = Integer.toString(i);
+      }
+
+      try {
+        OutputStream fin = dfs.create(new Path(path, fn));
+        BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin));
+        wr.write(Integer.toString(i));
+        wr.close();
+        fin.close();
+      } catch (IOException ioe) {
+        fail("Fail writing to input file: " + fn + " in directory: " + path + 
ioe.getMessage());
+      }
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
index 21ca378..b121425 100644
--- 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
+++ 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
@@ -43,234 +43,234 @@ import org.junit.Test;
 import org.apache.commons.io.FileUtils;
 
 public class LocalFileStreamSourceTest {
-       private static final String BASE_DIR = "localfsTest";
-       private static final int NUM_FILES_IN_DIR = 4;
-       private static final int NUM_NOISE_FILES_IN_DIR = 2;
-       
-       private LocalFileStreamSource streamSource;
-
-       @Before
-       public void setUp() throws Exception {
-               streamSource = new LocalFileStreamSource();
-               
-       }
-
-       @After
-       public void tearDown() throws Exception {
-               FileUtils.deleteDirectory(new File(BASE_DIR));
-       }
-
-       @Test
-       public void testInitWithSingleFileAndExtension() {
-               // write input file
-               writeSimpleFiles(BASE_DIR,"txt",1);
-                               
-               // init with path to input file
-               File inFile = new File(BASE_DIR,"1.txt");
-               String inFilePath = inFile.getAbsolutePath();
-               streamSource.init(inFilePath, "txt");
-                               
-               //assertions
-               assertEquals("Size of filePaths is not correct.", 
1,streamSource.getFilePathListSize(),0);
-               String fn = streamSource.getFilePathAt(0);
-               assertEquals("Incorrect file in filePaths.",inFilePath,fn);
-       }
-       
-       @Test
-       public void testInitWithSingleFileAndNullExtension() {
-               // write input file
-               writeSimpleFiles(BASE_DIR,"txt",1);
-                               
-               // init with path to input file
-               File inFile = new File(BASE_DIR,"1.txt");
-               String inFilePath = inFile.getAbsolutePath();
-               streamSource.init(inFilePath, null);
-                               
-               //assertions
-               assertEquals("Size of filePaths is not correct.", 
1,streamSource.getFilePathListSize(),0);
-               String fn = streamSource.getFilePathAt(0);
-               assertEquals("Incorrect file in filePaths.",inFilePath,fn);
-       }
-       
-       @Test
-       public void testInitWithFolderAndExtension() {
-               // write input file
-               writeSimpleFiles(BASE_DIR,null,NUM_NOISE_FILES_IN_DIR);
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-                               
-               // init with path to input dir
-               File inDir = new File(BASE_DIR);
-               String inDirPath = inDir.getAbsolutePath();
-               streamSource.init(inDirPath, "txt");
-                               
-               //assertions
-               assertEquals("Size of filePaths is not correct.", 
NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0);
-               Set<String> filenames = new HashSet<String>();
-               for (int i=1; i<=NUM_FILES_IN_DIR; i++) {
-                       String expectedFn = (new 
File(inDirPath,Integer.toString(i)+".txt")).getAbsolutePath();
-                       filenames.add(expectedFn);
-               }
-               for (int i=0; i< NUM_FILES_IN_DIR; i++) {
-                       String fn = streamSource.getFilePathAt(i);
-                       assertTrue("Incorrect file in 
filePaths:"+fn,filenames.contains(fn));
-               }
-       }
-       
-       @Test
-       public void testInitWithFolderAndNullExtension() {
-               // write input file
-               writeSimpleFiles(BASE_DIR,null,NUM_FILES_IN_DIR);
-                               
-               // init with path to input dir
-               File inDir = new File(BASE_DIR);
-               String inDirPath = inDir.getAbsolutePath();
-               streamSource.init(inDirPath, null);
-                               
-               //assertions
-               assertEquals("Size of filePaths is not correct.", 
NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0);
-               Set<String> filenames = new HashSet<String>();
-               for (int i=1; i<=NUM_FILES_IN_DIR; i++) {
-                       String expectedFn = (new 
File(inDirPath,Integer.toString(i))).getAbsolutePath();
-                       filenames.add(expectedFn);
-               }
-               for (int i=0; i< NUM_FILES_IN_DIR; i++) {
-                       String fn = streamSource.getFilePathAt(i);
-                       assertTrue("Incorrect file in 
filePaths:"+fn,filenames.contains(fn));
-               }
-       }
-       
-       /*
-        * getNextInputStream tests
-        */
-       @Test
-       public void testGetNextInputStream() {
-               // write input files & noise files
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-                                       
-               // init with path to input dir
-               streamSource.init(BASE_DIR, "txt");
-                               
-               // call getNextInputStream & assertions
-               Set<String> contents = new HashSet<String>();
-               for (int i=1; i<=NUM_FILES_IN_DIR; i++) {
-                       contents.add(Integer.toString(i));
-               }
-               for (int i=0; i< NUM_FILES_IN_DIR; i++) {
-                       InputStream inStream = 
streamSource.getNextInputStream();
-                       assertNotNull("Unexpected end of input stream 
list.",inStream);
-                       
-                       BufferedReader rd = new BufferedReader(new 
InputStreamReader(inStream));
-                       String inputRead = null;
-                       try {
-                               inputRead = rd.readLine();
-                       } catch (IOException ioe) {
-                               fail("Fail reading from stream at index:"+i + 
ioe.getMessage());
-                       }
-                       assertTrue("File content is 
incorrect.",contents.contains(inputRead));
-                       Iterator<String> it = contents.iterator();
-                       while (it.hasNext()) {
-                               if (it.next().equals(inputRead)) {
-                                       it.remove();
-                                       break;
-                               }
-                       }
-               }
-               
-               // assert that another call to getNextInputStream will return 
null
-               assertNull("Call getNextInputStream after the last file did not 
return null.",streamSource.getNextInputStream());
-       }
-       
-       /*
-        * getCurrentInputStream tests
-        */
-       public void testGetCurrentInputStream() {
-               // write input files & noise files
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-                                                       
-               // init with path to input dir
-               streamSource.init(BASE_DIR, "txt");
-                                               
-               // call getNextInputStream, getCurrentInputStream & assertions
-               for (int i=0; i<= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
-                       InputStream inStream1 = 
streamSource.getNextInputStream();
-                       InputStream inStream2 = 
streamSource.getCurrentInputStream();
-                       assertSame("Incorrect current input stream.",inStream1, 
inStream2);
-               }
-       }
-       
-       /*
-        * reset tests
-        */
-       public void testReset() {
-               // write input files & noise files
-               writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR);
-
-               // init with path to input dir
-               streamSource.init(BASE_DIR, "txt");
-
-               // Get the first input string
-               InputStream firstInStream = streamSource.getNextInputStream();
-               String firstInput = null;
-               assertNotNull("Unexpected end of input stream 
list.",firstInStream);
-
-               BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
-               try {
-                       firstInput = rd1.readLine();
-               } catch (IOException ioe) {
-                       fail("Fail reading from stream at index:0" + 
ioe.getMessage());
-               }
-
-               // call getNextInputStream a few times
-               streamSource.getNextInputStream();
-
-               // call reset, call next, assert that output is 1 (the first 
file)
-               try {
-                       streamSource.reset();
-               } catch (IOException ioe) {
-                       fail("Fail resetting stream source." + 
ioe.getMessage());
-               }
-
-               InputStream inStream = streamSource.getNextInputStream();
-               assertNotNull("Unexpected end of input stream list.",inStream);
-
-               BufferedReader rd2 = new BufferedReader(new 
InputStreamReader(inStream));
-               String inputRead = null;
-               try {
-                       inputRead = rd2.readLine();
-               } catch (IOException ioe) {
-                       fail("Fail reading from stream at index:0" + 
ioe.getMessage());
-               }
-               assertEquals("File content is incorrect.",firstInput,inputRead);
-       }
-       
-       private void writeSimpleFiles(String path, String ext, int numOfFiles) {
-               // Create folder
-               File folder = new File(path);
-               if (!folder.exists()) {
-                       try{
-                       folder.mkdir();
-                   } catch(SecurityException se){
-                       fail("Failed creating directory:"+path+se);
-                   }
-               }
-               
-               // Write files
-               for (int i=1; i<=numOfFiles; i++) {
-                       String fn = null;
-                       if (ext != null) {
-                               fn = Integer.toString(i) + "."+ ext;
-                       } else {
-                               fn = Integer.toString(i);
-                       }
-                       
-                       try {
-                               FileWriter fwr = new FileWriter(new 
File(path,fn));
-                               fwr.write(Integer.toString(i));
-                               fwr.close();
-                       } catch (IOException ioe) {
-                               fail("Fail writing to input file: "+ fn + " in 
directory: " + path + ioe.getMessage());
-                       }
-               }
-       }
+  private static final String BASE_DIR = "localfsTest";
+  private static final int NUM_FILES_IN_DIR = 4;
+  private static final int NUM_NOISE_FILES_IN_DIR = 2;
+
+  private LocalFileStreamSource streamSource;
+
+  @Before
+  public void setUp() throws Exception {
+    streamSource = new LocalFileStreamSource();
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(new File(BASE_DIR));
+  }
+
+  @Test
+  public void testInitWithSingleFileAndExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    File inFile = new File(BASE_DIR, "1.txt");
+    String inFilePath = inFile.getAbsolutePath();
+    streamSource.init(inFilePath, "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertEquals("Incorrect file in filePaths.", inFilePath, fn);
+  }
+
+  @Test
+  public void testInitWithSingleFileAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    File inFile = new File(BASE_DIR, "1.txt");
+    String inFilePath = inFile.getAbsolutePath();
+    streamSource.init(inFilePath, null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertEquals("Incorrect file in filePaths.", inFilePath, fn);
+  }
+
+  @Test
+  public void testInitWithFolderAndExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR);
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    File inDir = new File(BASE_DIR);
+    String inDirPath = inDir.getAbsolutePath();
+    streamSource.init(inDirPath, "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String expectedFn = (new File(inDirPath, Integer.toString(i) + 
".txt")).getAbsolutePath();
+      filenames.add(expectedFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  @Test
+  public void testInitWithFolderAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    File inDir = new File(BASE_DIR);
+    String inDirPath = inDir.getAbsolutePath();
+    streamSource.init(inDirPath, null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String expectedFn = (new File(inDirPath, 
Integer.toString(i))).getAbsolutePath();
+      filenames.add(expectedFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  /*
+   * getNextInputStream tests
+   */
+  @Test
+  public void testGetNextInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(BASE_DIR, "txt");
+
+    // call getNextInputStream & assertions
+    Set<String> contents = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      contents.add(Integer.toString(i));
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      InputStream inStream = streamSource.getNextInputStream();
+      assertNotNull("Unexpected end of input stream list.", inStream);
+
+      BufferedReader rd = new BufferedReader(new InputStreamReader(inStream));
+      String inputRead = null;
+      try {
+        inputRead = rd.readLine();
+      } catch (IOException ioe) {
+        fail("Fail reading from stream at index:" + i + ioe.getMessage());
+      }
+      assertTrue("File content is incorrect.", contents.contains(inputRead));
+      Iterator<String> it = contents.iterator();
+      while (it.hasNext()) {
+        if (it.next().equals(inputRead)) {
+          it.remove();
+          break;
+        }
+      }
+    }
+
+    // assert that another call to getNextInputStream will return null
+    assertNull("Call getNextInputStream after the last file did not return 
null.", streamSource.getNextInputStream());
+  }
+
+  /*
+   * getCurrentInputStream tests
+   */
+  public void testGetCurrentInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(BASE_DIR, "txt");
+
+    // call getNextInputStream, getCurrentInputStream & assertions
+    for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
+      InputStream inStream1 = streamSource.getNextInputStream();
+      InputStream inStream2 = streamSource.getCurrentInputStream();
+      assertSame("Incorrect current input stream.", inStream1, inStream2);
+    }
+  }
+
+  /*
+   * reset tests
+   */
+  public void testReset() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(BASE_DIR, "txt");
+
+    // Get the first input string
+    InputStream firstInStream = streamSource.getNextInputStream();
+    String firstInput = null;
+    assertNotNull("Unexpected end of input stream list.", firstInStream);
+
+    BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
+    try {
+      firstInput = rd1.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+
+    // call getNextInputStream a few times
+    streamSource.getNextInputStream();
+
+    // call reset, call next, assert that output is 1 (the first file)
+    try {
+      streamSource.reset();
+    } catch (IOException ioe) {
+      fail("Fail resetting stream source." + ioe.getMessage());
+    }
+
+    InputStream inStream = streamSource.getNextInputStream();
+    assertNotNull("Unexpected end of input stream list.", inStream);
+
+    BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream));
+    String inputRead = null;
+    try {
+      inputRead = rd2.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+    assertEquals("File content is incorrect.", firstInput, inputRead);
+  }
+
+  private void writeSimpleFiles(String path, String ext, int numOfFiles) {
+    // Create folder
+    File folder = new File(path);
+    if (!folder.exists()) {
+      try {
+        folder.mkdir();
+      } catch (SecurityException se) {
+        fail("Failed creating directory:" + path + se);
+      }
+    }
+
+    // Write files
+    for (int i = 1; i <= numOfFiles; i++) {
+      String fn = null;
+      if (ext != null) {
+        fn = Integer.toString(i) + "." + ext;
+      } else {
+        fn = Integer.toString(i);
+      }
+
+      try {
+        FileWriter fwr = new FileWriter(new File(path, fn));
+        fwr.write(Integer.toString(i));
+        fwr.close();
+      } catch (IOException ioe) {
+        fail("Fail writing to input file: " + fn + " in directory: " + path + 
ioe.getMessage());
+      }
+    }
+  }
 
 }

Reply via email to