http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java
 
b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java
new file mode 100644
index 0000000..d1d22ba
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java
@@ -0,0 +1,115 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+
+/**
+ * Helper class for EntranceProcessingItem implementation.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public abstract class AbstractEntranceProcessingItem implements 
EntranceProcessingItem {
+  private EntranceProcessor processor;
+  private String name;
+  private Stream outputStream;
+
+  /*
+   * Constructor
+   */
+  public AbstractEntranceProcessingItem() {
+    this(null);
+  }
+
+  public AbstractEntranceProcessingItem(EntranceProcessor processor) {
+    this.processor = processor;
+  }
+
+  /*
+   * Processor
+   */
+  /**
+   * Set the entrance processor for this EntranceProcessingItem
+   * 
+   * @param processor
+   *          the processor
+   */
+  protected void setProcessor(EntranceProcessor processor) {
+    this.processor = processor;
+  }
+
+  /**
+   * Get the EntranceProcessor of this EntranceProcessingItem.
+   * 
+   * @return the EntranceProcessor
+   */
+  public EntranceProcessor getProcessor() {
+    return this.processor;
+  }
+
+  /*
+   * Name/ID
+   */
+  /**
+   * Set the name (or ID) of this EntranceProcessingItem
+   * 
+   * @param name
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the name (or ID) of this EntranceProcessingItem
+   * 
+   * @return the name (or ID)
+   */
+  public String getName() {
+    return this.name;
+  }
+
+  /*
+   * Output Stream
+   */
+  /**
+   * Set the output stream of this EntranceProcessingItem. An 
EntranceProcessingItem should have only 1 single output
+   * stream and should not be re-assigned.
+   * 
+   * @return this EntranceProcessingItem
+   */
+  public EntranceProcessingItem setOutputStream(Stream outputStream) {
+    if (this.outputStream != null && this.outputStream != outputStream) {
+      throw new IllegalStateException("Cannot overwrite output stream of 
EntranceProcessingItem");
+    } else
+      this.outputStream = outputStream;
+    return this;
+  }
+
+  /**
+   * Get the output stream of this EntranceProcessingItem.
+   * 
+   * @return the output stream
+   */
+  public Stream getOutputStream() {
+    return this.outputStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java
new file mode 100644
index 0000000..112d76e
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java
@@ -0,0 +1,168 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.utils.PartitioningScheme;
+
+/**
+ * Abstract ProcessingItem
+ * 
+ * Helper for implementation of ProcessingItem. It has basic information for a 
ProcessingItem: name, parallelismLevel
+ * and a processor. Subclass of this class needs to implement {@link 
#addInputStream(Stream, PartitioningScheme)}.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public abstract class AbstractProcessingItem implements ProcessingItem {
+  private String name;
+  private int parallelism;
+  private Processor processor;
+
+  /*
+   * Constructor
+   */
+  public AbstractProcessingItem() {
+    this(null);
+  }
+
+  public AbstractProcessingItem(Processor processor) {
+    this(processor, 1);
+  }
+
+  public AbstractProcessingItem(Processor processor, int parallelism) {
+    this.processor = processor;
+    this.parallelism = parallelism;
+  }
+
+  /*
+   * Processor
+   */
+  /**
+   * Set the processor for this ProcessingItem
+   * 
+   * @param processor
+   *          the processor
+   */
+  protected void setProcessor(Processor processor) {
+    this.processor = processor;
+  }
+
+  /**
+   * Get the processor of this ProcessingItem
+   * 
+   * @return the processor
+   */
+  public Processor getProcessor() {
+    return this.processor;
+  }
+
+  /*
+   * Parallelism
+   */
+  /**
+   * Set the parallelism factor of this ProcessingItem
+   * 
+   * @param parallelism
+   */
+  protected void setParallelism(int parallelism) {
+    this.parallelism = parallelism;
+  }
+
+  /**
+   * Get the parallelism factor of this ProcessingItem
+   * 
+   * @return the parallelism factor
+   */
+  @Override
+  public int getParallelism() {
+    return this.parallelism;
+  }
+
+  /*
+   * Name/ID
+   */
+  /**
+   * Set the name (or ID) of this ProcessingItem
+   * 
+   * @param name
+   *          the name/ID
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the name (or ID) of this ProcessingItem
+   * 
+   * @return the name/ID
+   */
+  public String getName() {
+    return this.name;
+  }
+
+  /*
+   * Add input streams
+   */
+  /**
+   * Add an input stream to this ProcessingItem
+   * 
+   * @param inputStream
+   *          the input stream to add
+   * @param scheme
+   *          partitioning scheme associated with this ProcessingItem and the 
input stream
+   * @return this ProcessingItem
+   */
+  protected abstract ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme);
+
+  /**
+   * Add an input stream to this ProcessingItem with SHUFFLE scheme
+   * 
+   * @param inputStream
+   *          the input stream
+   * @return this ProcessingItem
+   */
+  public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+    return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE);
+  }
+
+  /**
+   * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme
+   * 
+   * @param inputStream
+   *          the input stream
+   * @return this ProcessingItem
+   */
+  public ProcessingItem connectInputKeyStream(Stream inputStream) {
+    return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY);
+  }
+
+  /**
+   * Add an input stream to this ProcessingItem with BROADCAST scheme
+   * 
+   * @param inputStream
+   *          the input stream
+   * @return this ProcessingItem
+   */
+  public ProcessingItem connectInputAllStream(Stream inputStream) {
+    return this.addInputStream(inputStream, PartitioningScheme.BROADCAST);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java
new file mode 100644
index 0000000..b79cc61
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java
@@ -0,0 +1,118 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * Abstract Stream
+ * 
+ * Helper for implementation of Stream. It has basic information for a Stream: 
streamID and source ProcessingItem.
+ * Subclass of this class needs to implement {@link #put(ContentEvent)}.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+
+public abstract class AbstractStream implements Stream {
+  private String streamID;
+  private IProcessingItem sourcePi;
+  private int batchSize;
+
+  /*
+   * Constructor
+   */
+  public AbstractStream() {
+    this(null);
+  }
+
+  public AbstractStream(IProcessingItem sourcePi) {
+    this.sourcePi = sourcePi;
+    this.batchSize = 1;
+  }
+
+  /**
+   * Get source processing item of this stream
+   * 
+   * @return
+   */
+  public IProcessingItem getSourceProcessingItem() {
+    return this.sourcePi;
+  }
+
+  /*
+   * Process event
+   */
+  @Override
+  /**
+   * Send a ContentEvent
+   * @param event
+   *                   the ContentEvent to be sent
+   */
+  public abstract void put(ContentEvent event);
+
+  /*
+   * Stream name
+   */
+  /**
+   * Get name (ID) of this stream
+   * 
+   * @return the name (ID)
+   */
+  @Override
+  public String getStreamId() {
+    return this.streamID;
+  }
+
+  /**
+   * Set the name (ID) of this stream
+   * 
+   * @param streamID
+   *          the name (ID)
+   */
+  public void setStreamId(String streamID) {
+    this.streamID = streamID;
+  }
+
+  /*
+   * Batch size
+   */
+  /**
+   * Set suggested batch size
+   * 
+   * @param batchSize
+   *          the suggested batch size
+   * 
+   */
+  @Override
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  /**
+   * Get suggested batch size
+   * 
+   * @return the suggested batch size
+   */
+  public int getBatchSize() {
+    return this.batchSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java
new file mode 100644
index 0000000..9d90c41
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java
@@ -0,0 +1,133 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Topology abstract class.
+ * 
+ * It manages basic information of a topology: name, sets of Streams and 
ProcessingItems
+ * 
+ */
+public abstract class AbstractTopology implements Topology {
+
+  private String topoName;
+  private Set<Stream> streams;
+  private Set<IProcessingItem> processingItems;
+  private Set<EntranceProcessingItem> entranceProcessingItems;
+
+  protected AbstractTopology(String name) {
+    this.topoName = name;
+    this.streams = new HashSet<>();
+    this.processingItems = new HashSet<>();
+    this.entranceProcessingItems = new HashSet<>();
+  }
+
+  /**
+   * Gets the name of this topology
+   * 
+   * @return name of the topology
+   */
+  public String getTopologyName() {
+    return this.topoName;
+  }
+
+  /**
+   * Sets the name of this topology
+   * 
+   * @param topologyName
+   *          name of the topology
+   */
+  public void setTopologyName(String topologyName) {
+    this.topoName = topologyName;
+  }
+
+  /**
+   * Adds an Entrance processing item to the topology.
+   * 
+   * @param epi
+   *          Entrance processing item
+   */
+  public void addEntranceProcessingItem(EntranceProcessingItem epi) {
+    this.entranceProcessingItems.add(epi);
+    this.addProcessingItem(epi);
+  }
+
+  /**
+   * Gets entrance processing items in the topology
+   * 
+   * @return the set of processing items
+   */
+  public Set<EntranceProcessingItem> getEntranceProcessingItems() {
+    return this.entranceProcessingItems;
+  }
+
+  /**
+   * Add processing item to topology.
+   * 
+   * @param procItem
+   *          Processing item.
+   */
+  public void addProcessingItem(IProcessingItem procItem) {
+    addProcessingItem(procItem, 1);
+  }
+
+  /**
+   * Add processing item to topology.
+   * 
+   * @param procItem
+   *          Processing item.
+   * @param parallelismHint
+   *          Processing item parallelism level.
+   */
+  public void addProcessingItem(IProcessingItem procItem, int parallelismHint) 
{
+    this.processingItems.add(procItem);
+  }
+
+  /**
+   * Gets processing items in the topology (including entrance processing 
items)
+   * 
+   * @return the set of processing items
+   */
+  public Set<IProcessingItem> getProcessingItems() {
+    return this.processingItems;
+  }
+
+  /**
+   * Add stream to topology.
+   * 
+   * @param stream
+   */
+  public void addStream(Stream stream) {
+    this.streams.add(stream);
+  }
+
+  /**
+   * Gets streams in the topology
+   * 
+   * @return the set of streams
+   */
+  public Set<Stream> getStreams() {
+    return this.streams;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java
new file mode 100644
index 0000000..d1e0c13
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java
@@ -0,0 +1,78 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+/**
+ * ComponentFactory interface. Provides platform specific components.
+ */
+public interface ComponentFactory {
+
+  /**
+   * Creates a platform specific processing item with the specified processor.
+   * 
+   * @param processor
+   *          contains the logic for this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem createPi(Processor processor);
+
+  /**
+   * Creates a platform specific processing item with the specified processor. 
Additionally sets the parallelism level.
+   * 
+   * @param processor
+   *          contains the logic for this processing item.
+   * @param parallelism
+   *          defines the amount of instances of this processing item will be 
created.
+   * @return ProcessingItem
+   */
+  public ProcessingItem createPi(Processor processor, int parallelism);
+
+  /**
+   * Creates a platform specific processing item with the specified processor 
that is the entrance point in the
+   * topology. This processing item can either generate a stream of data or 
connect to an external stream of data.
+   * 
+   * @param entranceProcessor
+   *          contains the logic for this processing item.
+   * @return EntranceProcessingItem
+   */
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor);
+
+  /**
+   * Creates a platform specific stream.
+   * 
+   * @param sourcePi
+   *          source processing item which will provide the events for this 
stream.
+   * @return Stream
+   */
+  public Stream createStream(IProcessingItem sourcePi);
+
+  /**
+   * Creates a platform specific topology.
+   * 
+   * @param topoName
+   *          Topology name.
+   * @return Topology
+   */
+  public Topology createTopology(String topoName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java
new file mode 100644
index 0000000..6fc0ed3
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java
@@ -0,0 +1,46 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+
+/**
+ * Entrance processing item interface.
+ */
+public interface EntranceProcessingItem extends IProcessingItem {
+
+  @Override
+  /**
+   * Gets the processing item processor.
+   * 
+   * @return the embedded EntranceProcessor. 
+   */
+  public EntranceProcessor getProcessor();
+
+  /**
+   * Set the single output stream for this EntranceProcessingItem.
+   * 
+   * @param stream
+   *          the stream
+   * @return the current instance of the EntranceProcessingItem for fluent 
interface.
+   */
+  public EntranceProcessingItem setOutputStream(Stream stream);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java
new file mode 100644
index 0000000..af86bde
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java
@@ -0,0 +1,47 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.Processor;
+
+/**
+ * ProcessingItem interface specific for entrance processing items.
+ * 
+ * @author severien
+ * 
+ */
+public interface IProcessingItem {
+
+  /**
+   * Gets the processing item processor.
+   * 
+   * @return Processor
+   */
+  public Processor getProcessor();
+
+  /**
+   * Sets processing item name.
+   * 
+   * @param name
+   */
+  // public void setName(String name);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java
new file mode 100644
index 0000000..b576982
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java
@@ -0,0 +1,46 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.tasks.Task;
+
+/**
+ * Submitter interface for programatically deploying platform specific 
topologies.
+ * 
+ * @author severien
+ * 
+ */
+public interface ISubmitter {
+
+  /**
+   * Deploy a specific task to a platform.
+   * 
+   * @param task
+   */
+  public void deployTask(Task task);
+
+  /**
+   * Sets if the task should run locally or distributed.
+   * 
+   * @param bool
+   */
+  public void setLocal(boolean bool);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java
 
b/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java
new file mode 100644
index 0000000..3cfa736
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java
@@ -0,0 +1,81 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+
+/**
+ * Implementation of EntranceProcessingItem for local engines (Simple, 
Multithreads)
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class LocalEntranceProcessingItem extends 
AbstractEntranceProcessingItem {
+  public LocalEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  /**
+   * If there are available events, first event in the queue will be sent out 
on the output stream.
+   * 
+   * @return true if there is (at least) one available event and it was sent 
out false otherwise
+   */
+  public boolean injectNextEvent() {
+    if (this.getProcessor().hasNext()) {
+      ContentEvent event = this.getProcessor().nextEvent();
+      this.getOutputStream().put(event);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Start sending events by calling {@link #injectNextEvent()}. If there are 
no available events, and that the stream
+   * is not entirely consumed, it will wait by calling {@link 
#waitForNewEvents()} before attempting to send again. </p>
+   * When the stream is entirely consumed, the last event is tagged 
accordingly and the processor gets the finished
+   * status.
+   * 
+   */
+  public void startSendingEvents() {
+    if (this.getOutputStream() == null)
+      throw new IllegalStateException("Try sending events from EntrancePI 
while outputStream is not set.");
+
+    while (!this.getProcessor().isFinished()) {
+      if (!this.injectNextEvent()) {
+        try {
+          waitForNewEvents();
+        } catch (Exception e) {
+          e.printStackTrace();
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Method to wait for an amount of time when there are no available events. 
Implementation of EntranceProcessingItem
+   * should override this method to implement non-blocking wait or to adjust 
the amount of time.
+   */
+  protected void waitForNewEvents() throws Exception {
+    Thread.sleep(100);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java
new file mode 100644
index 0000000..edb4aaa
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java
@@ -0,0 +1,68 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Processing item interface.
+ * 
+ * @author severien
+ * 
+ */
+public interface ProcessingItem extends IProcessingItem {
+
+  /**
+   * Connects this processing item in a round robin fashion. The events will 
be distributed evenly between the
+   * instantiated processing items.
+   * 
+   * @param inputStream
+   *          Stream to connect this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem connectInputShuffleStream(Stream inputStream);
+
+  /**
+   * Connects this processing item taking the event key into account. Events 
will be routed to the processing item
+   * according to the modulus of its key and the paralellism level. Ex.: key = 
5 and paralellism = 2, 5 mod 2 = 1.
+   * Processing item responsible for 1 will receive this event.
+   * 
+   * @param inputStream
+   *          Stream to connect this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem connectInputKeyStream(Stream inputStream);
+
+  /**
+   * Connects this processing item to the stream in a broadcast fashion. All 
processing items of this type will receive
+   * copy of the original event.
+   * 
+   * @param inputStream
+   *          Stream to connect this processing item.
+   * @return ProcessingItem
+   */
+  public ProcessingItem connectInputAllStream(Stream inputStream);
+
+  /**
+   * Gets processing item parallelism level.
+   * 
+   * @return int
+   */
+  public int getParallelism();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java
new file mode 100644
index 0000000..7aadb97
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java
@@ -0,0 +1,61 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * Stream interface.
+ * 
+ * @author severien
+ * 
+ */
+public interface Stream {
+
+  /**
+   * Puts events into a platform specific data stream.
+   * 
+   * @param event
+   */
+  public void put(ContentEvent event);
+
+  /**
+   * Sets the stream id which is represented by a name.
+   * 
+   * @param stream
+   */
+  // public void setStreamId(String stream);
+
+  /**
+   * Gets stream id.
+   * 
+   * @return id
+   */
+  public String getStreamId();
+
+  /**
+   * Set batch size
+   * 
+   * @param batchSize
+   *          the suggested size for batching messages on this stream
+   */
+  public void setBatchSize(int batchsize);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java
new file mode 100644
index 0000000..41cbdd0
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java
@@ -0,0 +1,82 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+public interface Topology {
+  /*
+   * Name
+   */
+  /**
+   * Get the topology's name
+   * 
+   * @return the name of the topology
+   */
+  public String getTopologyName();
+
+  /**
+   * Set the topology's name
+   * 
+   * @param topologyName
+   *          the name of the topology
+   */
+  public void setTopologyName(String topologyName);
+
+  /*
+   * Entrance Processing Items
+   */
+  /**
+   * Add an EntranceProcessingItem to this topology
+   * 
+   * @param epi
+   *          the EntranceProcessingItem to be added
+   */
+  void addEntranceProcessingItem(EntranceProcessingItem epi);
+
+  /*
+   * Processing Items
+   */
+  /**
+   * Add a ProcessingItem to this topology with default parallelism level 
(i.e. 1)
+   * 
+   * @param procItem
+   *          the ProcessingItem to be added
+   */
+  void addProcessingItem(IProcessingItem procItem);
+
+  /**
+   * Add a ProcessingItem to this topology with an associated parallelism level
+   * 
+   * @param procItem
+   *          the ProcessingItem to be added
+   * @param parallelismHint
+   *          the parallelism level
+   */
+  void addProcessingItem(IProcessingItem procItem, int parallelismHint);
+
+  /*
+   * Streams
+   */
+  /**
+   * 
+   * @param stream
+   */
+  void addStream(Stream stream);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java 
b/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java
new file mode 100644
index 0000000..d67b609
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java
@@ -0,0 +1,228 @@
+package org.apache.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Builder class that creates topology components and assemble them together.
+ * 
+ */
+public class TopologyBuilder {
+
+  // TODO:
+  // Possible options:
+  // 1. we may convert this as interface and platform dependent builder will
+  // inherit this method
+  // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology
+  // -ve -> fat class where it has capabilities to instantiate specific
+  // component and connecting them
+  // +ve -> easy abstraction for SAMOA developer
+  // "you just implement your builder logic here!"
+  private ComponentFactory componentFactory;
+  private Topology topology;
+  private Map<Processor, IProcessingItem> mapProcessorToProcessingItem;
+
+  // TODO: refactor, temporary constructor used by Storm code
+  public TopologyBuilder() {
+    // TODO: initialize _componentFactory using dynamic binding
+    // for now, use StormComponentFactory
+    // should the factory be Singleton (?)
+    // ans: at the moment, no, i.e. each builder will has its associated
+    // factory!
+    // and the factory will be instantiated using dynamic binding
+    // this.componentFactory = new StormComponentFactory();
+  }
+
+  // TODO: refactor, temporary constructor used by S4 code
+  public TopologyBuilder(ComponentFactory theFactory) {
+    this.componentFactory = theFactory;
+  }
+
+  /**
+   * Initiates topology with a specific name.
+   * 
+   * @param topologyName
+   */
+  public void initTopology(String topologyName) {
+    this.initTopology(topologyName, 0);
+  }
+
+  /**
+   * Initiates topology with a specific name and a delay between consecutive 
instances.
+   * 
+   * @param topologyName
+   * @param delay
+   *          delay between injections of two instances from source (in 
milliseconds)
+   */
+  public void initTopology(String topologyName, int delay) {
+    if (this.topology != null) {
+      // TODO: possible refactor this code later
+      System.out.println("Topology has been initialized before!");
+      return;
+    }
+    this.topology = componentFactory.createTopology(topologyName);
+  }
+
+  /**
+   * Returns the platform specific topology.
+   * 
+   * @return
+   */
+  public Topology build() {
+    return topology;
+  }
+
+  public ProcessingItem addProcessor(Processor processor, int parallelism) {
+    ProcessingItem pi = createPi(processor, parallelism);
+    if (this.mapProcessorToProcessingItem == null)
+      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
+    this.mapProcessorToProcessingItem.put(processor, pi);
+    return pi;
+  }
+
+  public ProcessingItem addProcessor(Processor processor) {
+    return addProcessor(processor, 1);
+  }
+
+  public ProcessingItem connectInputShuffleStream(Stream inputStream, 
Processor processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    return pi.connectInputShuffleStream(inputStream);
+  }
+
+  public ProcessingItem connectInputKeyStream(Stream inputStream, Processor 
processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    return pi.connectInputKeyStream(inputStream);
+  }
+
+  public ProcessingItem connectInputAllStream(Stream inputStream, Processor 
processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    return pi.connectInputAllStream(inputStream);
+  }
+
+  public Stream createInputShuffleStream(Processor processor, Processor dest) {
+    Stream inputStream = this.createStream(dest);
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    pi.connectInputShuffleStream(inputStream);
+    return inputStream;
+  }
+
+  public Stream createInputKeyStream(Processor processor, Processor dest) {
+    Stream inputStream = this.createStream(dest);
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    pi.connectInputKeyStream(inputStream);
+    return inputStream;
+  }
+
+  public Stream createInputAllStream(Processor processor, Processor dest) {
+    Stream inputStream = this.createStream(dest);
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
+    pi.connectInputAllStream(inputStream);
+    return inputStream;
+  }
+
+  public Stream createStream(Processor processor) {
+    IProcessingItem pi = mapProcessorToProcessingItem.get(processor);
+    Stream ret = null;
+    Preconditions.checkNotNull(pi, "Trying to create stream from null PI");
+    ret = this.createStream(pi);
+    if (pi instanceof EntranceProcessingItem)
+      ((EntranceProcessingItem) pi).setOutputStream(ret);
+    return ret;
+  }
+
+  public EntranceProcessingItem addEntranceProcessor(EntranceProcessor 
entranceProcessor) {
+    EntranceProcessingItem pi = createEntrancePi(entranceProcessor);
+    if (this.mapProcessorToProcessingItem == null)
+      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
+    mapProcessorToProcessingItem.put(entranceProcessor, pi);
+    return pi;
+  }
+
+  public ProcessingItem getProcessingItem(Processor processor) {
+    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
+    Preconditions.checkNotNull(pi, "Trying to retrieve null PI");
+    return pi;
+  }
+
+  /**
+   * Creates a processing item with a specific processor and paralellism level 
of 1.
+   * 
+   * @param processor
+   * @return ProcessingItem
+   */
+  @SuppressWarnings("unused")
+  private ProcessingItem createPi(Processor processor) {
+    return createPi(processor, 1);
+  }
+
+  /**
+   * Creates a processing item with a specific processor and paralellism level.
+   * 
+   * @param processor
+   * @param parallelism
+   * @return ProcessingItem
+   */
+  private ProcessingItem createPi(Processor processor, int parallelism) {
+    ProcessingItem pi = this.componentFactory.createPi(processor, parallelism);
+    this.topology.addProcessingItem(pi, parallelism);
+    return pi;
+  }
+
+  /**
+   * Creates a platform specific entrance processing item.
+   * 
+   * @param processor
+   * @return
+   */
+  private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) 
{
+    EntranceProcessingItem epi = 
this.componentFactory.createEntrancePi(processor);
+    this.topology.addEntranceProcessingItem(epi);
+    if (this.mapProcessorToProcessingItem == null)
+      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
+    this.mapProcessorToProcessingItem.put(processor, epi);
+    return epi;
+  }
+
+  /**
+   * Creates a platform specific stream.
+   * 
+   * @param sourcePi
+   *          source processing item.
+   * @return
+   */
+  private Stream createStream(IProcessingItem sourcePi) {
+    Stream stream = this.componentFactory.createStream(sourcePi);
+    this.topology.addStream(stream);
+    return stream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java 
b/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java
new file mode 100644
index 0000000..374120d
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java
@@ -0,0 +1,33 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Represents the 3 schemes to partition the streams
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public enum PartitioningScheme {
+  SHUFFLE, GROUP_BY_KEY, BROADCAST
+}
+// TODO: use this enum in S4
+// Storm doesn't seem to need this

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java 
b/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java
new file mode 100644
index 0000000..be82982
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java
@@ -0,0 +1,62 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.IProcessingItem;
+
+/**
+ * Represents one destination for streams. It has the info of: the 
ProcessingItem, parallelismHint, and partitioning
+ * scheme. Usage: - When ProcessingItem connects to a stream, it will pass a 
StreamDestination to the stream. - Stream
+ * manages a set of StreamDestination. - Used in single-threaded and 
multi-threaded local mode.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class StreamDestination {
+  private IProcessingItem pi;
+  private int parallelism;
+  private PartitioningScheme type;
+
+  /*
+   * Constructor
+   */
+  public StreamDestination(IProcessingItem pi, int parallelismHint, 
PartitioningScheme type) {
+    this.pi = pi;
+    this.parallelism = parallelismHint;
+    this.type = type;
+  }
+
+  /*
+   * Getters
+   */
+  public IProcessingItem getProcessingItem() {
+    return this.pi;
+  }
+
+  public int getParallelism() {
+    return this.parallelism;
+  }
+
+  public PartitioningScheme getPartitioningScheme() {
+    return this.type;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java 
b/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java
new file mode 100644
index 0000000..f8ae5ef
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java
@@ -0,0 +1,183 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.jar.Attributes;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+
+/**
+ * Utils class for building and deploying applications programmatically.
+ * 
+ * @author severien
+ * 
+ */
+public class Utils {
+
+  public static void buildSamoaPackage() {
+    try {
+      String output = "/tmp/samoa/samoa.jar";// 
System.getProperty("user.home") + "/samoa.jar";
+      Manifest manifest = createManifest();
+
+      BufferedOutputStream bo;
+
+      bo = new BufferedOutputStream(new FileOutputStream(output));
+      JarOutputStream jo = new JarOutputStream(bo, manifest);
+
+      String baseDir = System.getProperty("user.dir");
+      System.out.println(baseDir);
+
+      File samoaJar = new File(baseDir + "/target/samoa-0.0.1-SNAPSHOT.jar");
+      addEntry(jo, samoaJar, baseDir + "/target/", "/app/");
+      addLibraries(jo);
+
+      jo.close();
+      bo.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  // TODO should get the modules file from the parameters
+  public static void buildModulesPackage(List<String> modulesNames) {
+    System.out.println(System.getProperty("user.dir"));
+    try {
+      String baseDir = System.getProperty("user.dir");
+      List<File> filesArray = new ArrayList<>();
+      for (String module : modulesNames) {
+        module = "/" + module.replace(".", "/") + ".class";
+        filesArray.add(new File(baseDir + module));
+      }
+      String output = System.getProperty("user.home") + "/modules.jar";
+
+      Manifest manifest = new Manifest();
+      manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,
+          "1.0");
+      manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL,
+          "http://samoa.yahoo.com";);
+      manifest.getMainAttributes().put(
+          Attributes.Name.IMPLEMENTATION_VERSION, "0.1");
+      manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR,
+          "Yahoo");
+      manifest.getMainAttributes().put(
+          Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA");
+
+      BufferedOutputStream bo;
+
+      bo = new BufferedOutputStream(new FileOutputStream(output));
+      JarOutputStream jo = new JarOutputStream(bo, manifest);
+
+      File[] files = filesArray.toArray(new File[filesArray.size()]);
+      addEntries(jo, files, baseDir, "");
+
+      jo.close();
+      bo.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  private static void addLibraries(JarOutputStream jo) {
+    try {
+      String baseDir = System.getProperty("user.dir");
+      String libDir = baseDir + "/target/lib";
+      File inputFile = new File(libDir);
+
+      File[] files = inputFile.listFiles();
+      for (File file : files) {
+        addEntry(jo, file, baseDir, "lib");
+      }
+      jo.close();
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static void addEntries(JarOutputStream jo, File[] files, String 
baseDir, String rootDir) {
+    for (File file : files) {
+
+      if (!file.isDirectory()) {
+        addEntry(jo, file, baseDir, rootDir);
+      } else {
+        File dir = new File(file.getAbsolutePath());
+        addEntries(jo, dir.listFiles(), baseDir, rootDir);
+      }
+    }
+  }
+
+  private static void addEntry(JarOutputStream jo, File file, String baseDir, 
String rootDir) {
+    try {
+      BufferedInputStream bi = new BufferedInputStream(new 
FileInputStream(file));
+
+      String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir);
+      jo.putNextEntry(new ZipEntry(path));
+
+      byte[] buf = new byte[1024];
+      int anz;
+      while ((anz = bi.read(buf)) != -1) {
+        jo.write(buf, 0, anz);
+      }
+      bi.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static Manifest createManifest() {
+    Manifest manifest = new Manifest();
+    manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, 
"http://samoa.yahoo.com";);
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, 
"0.1");
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, 
"Yahoo");
+    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, 
"SAMOA");
+    Attributes s4Attributes = new Attributes();
+    s4Attributes.putValue("S4-App-Class", "path.to.Class");
+    Attributes.Name name = new Attributes.Name("S4-App-Class");
+    Attributes.Name S4Version = new Attributes.Name("S4-Version");
+    manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp");
+    manifest.getMainAttributes().put(S4Version, "0.6.0-incubating");
+    return manifest;
+  }
+
+  public static Object getInstance(String className) {
+    Class<?> cls;
+    Object obj = null;
+    try {
+      cls = Class.forName(className);
+      obj = cls.newInstance();
+    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
+      e.printStackTrace();
+    }
+    return obj;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java
deleted file mode 100644
index 68eec91..0000000
--- a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package com.yahoo.labs.samoa.core;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.*;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class DoubleVectorTest {
-  private DoubleVector emptyVector, array5Vector;
-
-  @Before
-  public void setUp() {
-    emptyVector = new DoubleVector();
-    array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 });
-  }
-
-  @Test
-  public void testGetArrayRef() {
-    assertThat(emptyVector.getArrayRef(), notNullValue());
-    assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef());
-    assertEquals(5, array5Vector.getArrayRef().length);
-  }
-
-  @Test
-  public void testGetArrayCopy() {
-    double[] arrayRef;
-    arrayRef = emptyVector.getArrayRef();
-    assertTrue(arrayRef != emptyVector.getArrayCopy());
-    assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy())));
-
-    arrayRef = array5Vector.getArrayRef();
-    assertTrue(arrayRef != array5Vector.getArrayCopy());
-    assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy())));
-  }
-
-  @Test
-  public void testNumNonZeroEntries() {
-    assertEquals(0, emptyVector.numNonZeroEntries());
-    assertEquals(3, array5Vector.numNonZeroEntries());
-  }
-
-  @Test(expected = IndexOutOfBoundsException.class)
-  public void testGetValueOutOfBound() {
-    @SuppressWarnings("unused")
-    double value = emptyVector.getArrayRef()[0];
-  }
-
-  @Test()
-  public void testSetValue() {
-    // test automatic vector enlargement
-    emptyVector.setValue(0, 1.0);
-    assertEquals(1, emptyVector.getArrayRef().length);
-    assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly 
the same, so delta=0.0
-
-    emptyVector.setValue(5, 5.5);
-    assertEquals(6, emptyVector.getArrayRef().length);
-    assertEquals(2, emptyVector.numNonZeroEntries());
-    assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly 
the same, so delta=0.0
-  }
-
-  @Test
-  public void testAddToValue() {
-    array5Vector.addToValue(2, 5.0);
-    assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly 
the same, so delta=0.0
-
-    // test automatic vector enlargement
-    emptyVector.addToValue(0, 1.0);
-    assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly 
the same, so delta=0.0
-  }
-
-  @Test
-  public void testSumOfValues() {
-    assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), 
Double.MIN_NORMAL);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
deleted file mode 100644
index d64ca0e..0000000
--- 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-package com.yahoo.labs.samoa.streams.fs;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.BufferedWriter;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.HashSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-
-import static org.junit.Assert.*;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class HDFSFileStreamSourceTest {
-
-  private static final String[] HOSTS = { "localhost" };
-  private static final String BASE_DIR = "/minidfsTest";
-  private static final int NUM_FILES_IN_DIR = 4;
-  private static final int NUM_NOISE_FILES_IN_DIR = 2;
-
-  private HDFSFileStreamSource streamSource;
-
-  private Configuration config;
-  private MiniDFSCluster hdfsCluster;
-  private String hdfsURI;
-
-  @Before
-  public void setUp() throws Exception {
-    // Start MiniDFSCluster
-    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new 
Configuration()).hosts(HOSTS).numDataNodes(1)
-        .format(true);
-    hdfsCluster = builder.build();
-    hdfsCluster.waitActive();
-    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort();
-
-    // Construct stream source
-    streamSource = new HDFSFileStreamSource();
-
-    // General config
-    config = new Configuration();
-    config.set("fs.defaultFS", hdfsURI);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    hdfsCluster.shutdown();
-  }
-
-  /*
-   * Init tests
-   */
-  @Test
-  public void testInitWithSingleFileAndExtension() {
-    // write input file
-    writeSimpleFiles(BASE_DIR, "txt", 1);
-
-    // init with path to input file
-    streamSource.init(config, BASE_DIR + "/1.txt", "txt");
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
-    String fn = streamSource.getFilePathAt(0);
-    assertTrue("Incorrect file in filePaths.",
-        fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + 
"1.txt"));
-  }
-
-  @Test
-  public void testInitWithSingleFileAndNullExtension() {
-    // write input file
-    writeSimpleFiles(BASE_DIR, "txt", 1);
-
-    // init with path to input file
-    streamSource.init(config, BASE_DIR + "/1.txt", null);
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
-    String fn = streamSource.getFilePathAt(0);
-    assertTrue("Incorrect file in filePaths.",
-        fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + 
"1.txt"));
-  }
-
-  @Test
-  public void testInitWithFolderAndExtension() {
-    // write input files & noise files
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-    writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(config, BASE_DIR, "txt");
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
-    Set<String> filenames = new HashSet<String>();
-    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
-      String targetFn = BASE_DIR + "/" + Integer.toString(i) + ".txt";
-      filenames.add(targetFn);
-      filenames.add(hdfsURI + targetFn);
-    }
-    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
-      String fn = streamSource.getFilePathAt(i);
-      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
-    }
-  }
-
-  @Test
-  public void testInitWithFolderAndNullExtension() {
-    // write input file
-    writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(config, BASE_DIR, null);
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
-    Set<String> filenames = new HashSet<String>();
-    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
-      String targetFn = BASE_DIR + "/" + Integer.toString(i);
-      filenames.add(targetFn);
-      filenames.add(hdfsURI + targetFn);
-    }
-    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
-      String fn = streamSource.getFilePathAt(i);
-      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
-    }
-  }
-
-  /*
-   * getNextInputStream tests
-   */
-  @Test
-  public void testGetNextInputStream() {
-    // write input files & noise files
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(config, BASE_DIR, "txt");
-
-    // call getNextInputStream & assertions
-    Set<String> contents = new HashSet<String>();
-    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
-      contents.add(Integer.toString(i));
-    }
-    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
-      InputStream inStream = streamSource.getNextInputStream();
-      assertNotNull("Unexpected end of input stream list.", inStream);
-
-      BufferedReader rd = new BufferedReader(new InputStreamReader(inStream));
-      String inputRead = null;
-      try {
-        inputRead = rd.readLine();
-      } catch (IOException ioe) {
-        fail("Fail reading from stream at index:" + i + ioe.getMessage());
-      }
-      assertTrue("File content is incorrect.", contents.contains(inputRead));
-      Iterator<String> it = contents.iterator();
-      while (it.hasNext()) {
-        if (it.next().equals(inputRead)) {
-          it.remove();
-          break;
-        }
-      }
-    }
-
-    // assert that another call to getNextInputStream will return null
-    assertNull("Call getNextInputStream after the last file did not return 
null.", streamSource.getNextInputStream());
-  }
-
-  /*
-   * getCurrentInputStream tests
-   */
-  public void testGetCurrentInputStream() {
-    // write input files & noise files
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(config, BASE_DIR, "txt");
-
-    // call getNextInputStream, getCurrentInputStream & assertions
-    for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
-      InputStream inStream1 = streamSource.getNextInputStream();
-      InputStream inStream2 = streamSource.getCurrentInputStream();
-      assertSame("Incorrect current input stream.", inStream1, inStream2);
-    }
-  }
-
-  /*
-   * reset tests
-   */
-  public void testReset() {
-    // write input files & noise files
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(config, BASE_DIR, "txt");
-
-    // Get the first input string
-    InputStream firstInStream = streamSource.getNextInputStream();
-    String firstInput = null;
-    assertNotNull("Unexpected end of input stream list.", firstInStream);
-
-    BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
-    try {
-      firstInput = rd1.readLine();
-    } catch (IOException ioe) {
-      fail("Fail reading from stream at index:0" + ioe.getMessage());
-    }
-
-    // call getNextInputStream a few times
-    streamSource.getNextInputStream();
-
-    // call reset, call next, assert that output is 1 (the first file)
-    try {
-      streamSource.reset();
-    } catch (IOException ioe) {
-      fail("Fail resetting stream source." + ioe.getMessage());
-    }
-
-    InputStream inStream = streamSource.getNextInputStream();
-    assertNotNull("Unexpected end of input stream list.", inStream);
-
-    BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream));
-    String inputRead = null;
-    try {
-      inputRead = rd2.readLine();
-    } catch (IOException ioe) {
-      fail("Fail reading from stream at index:0" + ioe.getMessage());
-    }
-    assertEquals("File content is incorrect.", firstInput, inputRead);
-  }
-
-  private void writeSimpleFiles(String path, String ext, int numOfFiles) {
-    // get filesystem
-    FileSystem dfs;
-    try {
-      dfs = hdfsCluster.getFileSystem();
-    } catch (IOException ioe) {
-      fail("Could not access MiniDFSCluster" + ioe.getMessage());
-      return;
-    }
-
-    // create basedir
-    Path basedir = new Path(path);
-    try {
-      dfs.mkdirs(basedir);
-    } catch (IOException ioe) {
-      fail("Could not create DIR:" + path + "\n" + ioe.getMessage());
-      return;
-    }
-
-    // write files
-    for (int i = 1; i <= numOfFiles; i++) {
-      String fn = null;
-      if (ext != null) {
-        fn = Integer.toString(i) + "." + ext;
-      } else {
-        fn = Integer.toString(i);
-      }
-
-      try {
-        OutputStream fin = dfs.create(new Path(path, fn));
-        BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin));
-        wr.write(Integer.toString(i));
-        wr.close();
-        fin.close();
-      } catch (IOException ioe) {
-        fail("Fail writing to input file: " + fn + " in directory: " + path + 
ioe.getMessage());
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
 
b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
deleted file mode 100644
index 5898421..0000000
--- 
a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java
+++ /dev/null
@@ -1,276 +0,0 @@
-package com.yahoo.labs.samoa.streams.fs;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.BufferedWriter;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.lang.SecurityException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.commons.io.FileUtils;
-
-public class LocalFileStreamSourceTest {
-  private static final String BASE_DIR = "localfsTest";
-  private static final int NUM_FILES_IN_DIR = 4;
-  private static final int NUM_NOISE_FILES_IN_DIR = 2;
-
-  private LocalFileStreamSource streamSource;
-
-  @Before
-  public void setUp() throws Exception {
-    streamSource = new LocalFileStreamSource();
-
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    FileUtils.deleteDirectory(new File(BASE_DIR));
-  }
-
-  @Test
-  public void testInitWithSingleFileAndExtension() {
-    // write input file
-    writeSimpleFiles(BASE_DIR, "txt", 1);
-
-    // init with path to input file
-    File inFile = new File(BASE_DIR, "1.txt");
-    String inFilePath = inFile.getAbsolutePath();
-    streamSource.init(inFilePath, "txt");
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
-    String fn = streamSource.getFilePathAt(0);
-    assertEquals("Incorrect file in filePaths.", inFilePath, fn);
-  }
-
-  @Test
-  public void testInitWithSingleFileAndNullExtension() {
-    // write input file
-    writeSimpleFiles(BASE_DIR, "txt", 1);
-
-    // init with path to input file
-    File inFile = new File(BASE_DIR, "1.txt");
-    String inFilePath = inFile.getAbsolutePath();
-    streamSource.init(inFilePath, null);
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
-    String fn = streamSource.getFilePathAt(0);
-    assertEquals("Incorrect file in filePaths.", inFilePath, fn);
-  }
-
-  @Test
-  public void testInitWithFolderAndExtension() {
-    // write input file
-    writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR);
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    File inDir = new File(BASE_DIR);
-    String inDirPath = inDir.getAbsolutePath();
-    streamSource.init(inDirPath, "txt");
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
-    Set<String> filenames = new HashSet<String>();
-    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
-      String expectedFn = (new File(inDirPath, Integer.toString(i) + 
".txt")).getAbsolutePath();
-      filenames.add(expectedFn);
-    }
-    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
-      String fn = streamSource.getFilePathAt(i);
-      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
-    }
-  }
-
-  @Test
-  public void testInitWithFolderAndNullExtension() {
-    // write input file
-    writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    File inDir = new File(BASE_DIR);
-    String inDirPath = inDir.getAbsolutePath();
-    streamSource.init(inDirPath, null);
-
-    // assertions
-    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
-    Set<String> filenames = new HashSet<String>();
-    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
-      String expectedFn = (new File(inDirPath, 
Integer.toString(i))).getAbsolutePath();
-      filenames.add(expectedFn);
-    }
-    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
-      String fn = streamSource.getFilePathAt(i);
-      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
-    }
-  }
-
-  /*
-   * getNextInputStream tests
-   */
-  @Test
-  public void testGetNextInputStream() {
-    // write input files & noise files
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(BASE_DIR, "txt");
-
-    // call getNextInputStream & assertions
-    Set<String> contents = new HashSet<String>();
-    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
-      contents.add(Integer.toString(i));
-    }
-    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
-      InputStream inStream = streamSource.getNextInputStream();
-      assertNotNull("Unexpected end of input stream list.", inStream);
-
-      BufferedReader rd = new BufferedReader(new InputStreamReader(inStream));
-      String inputRead = null;
-      try {
-        inputRead = rd.readLine();
-      } catch (IOException ioe) {
-        fail("Fail reading from stream at index:" + i + ioe.getMessage());
-      }
-      assertTrue("File content is incorrect.", contents.contains(inputRead));
-      Iterator<String> it = contents.iterator();
-      while (it.hasNext()) {
-        if (it.next().equals(inputRead)) {
-          it.remove();
-          break;
-        }
-      }
-    }
-
-    // assert that another call to getNextInputStream will return null
-    assertNull("Call getNextInputStream after the last file did not return 
null.", streamSource.getNextInputStream());
-  }
-
-  /*
-   * getCurrentInputStream tests
-   */
-  public void testGetCurrentInputStream() {
-    // write input files & noise files
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(BASE_DIR, "txt");
-
-    // call getNextInputStream, getCurrentInputStream & assertions
-    for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
-      InputStream inStream1 = streamSource.getNextInputStream();
-      InputStream inStream2 = streamSource.getCurrentInputStream();
-      assertSame("Incorrect current input stream.", inStream1, inStream2);
-    }
-  }
-
-  /*
-   * reset tests
-   */
-  public void testReset() {
-    // write input files & noise files
-    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
-
-    // init with path to input dir
-    streamSource.init(BASE_DIR, "txt");
-
-    // Get the first input string
-    InputStream firstInStream = streamSource.getNextInputStream();
-    String firstInput = null;
-    assertNotNull("Unexpected end of input stream list.", firstInStream);
-
-    BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
-    try {
-      firstInput = rd1.readLine();
-    } catch (IOException ioe) {
-      fail("Fail reading from stream at index:0" + ioe.getMessage());
-    }
-
-    // call getNextInputStream a few times
-    streamSource.getNextInputStream();
-
-    // call reset, call next, assert that output is 1 (the first file)
-    try {
-      streamSource.reset();
-    } catch (IOException ioe) {
-      fail("Fail resetting stream source." + ioe.getMessage());
-    }
-
-    InputStream inStream = streamSource.getNextInputStream();
-    assertNotNull("Unexpected end of input stream list.", inStream);
-
-    BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream));
-    String inputRead = null;
-    try {
-      inputRead = rd2.readLine();
-    } catch (IOException ioe) {
-      fail("Fail reading from stream at index:0" + ioe.getMessage());
-    }
-    assertEquals("File content is incorrect.", firstInput, inputRead);
-  }
-
-  private void writeSimpleFiles(String path, String ext, int numOfFiles) {
-    // Create folder
-    File folder = new File(path);
-    if (!folder.exists()) {
-      try {
-        folder.mkdir();
-      } catch (SecurityException se) {
-        fail("Failed creating directory:" + path + se);
-      }
-    }
-
-    // Write files
-    for (int i = 1; i <= numOfFiles; i++) {
-      String fn = null;
-      if (ext != null) {
-        fn = Integer.toString(i) + "." + ext;
-      } else {
-        fn = Integer.toString(i);
-      }
-
-      try {
-        FileWriter fwr = new FileWriter(new File(path, fn));
-        fwr.write(Integer.toString(i));
-        fwr.close();
-      } catch (IOException ioe) {
-        fail("Fail writing to input file: " + fn + " in directory: " + path + 
ioe.getMessage());
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java 
b/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java
new file mode 100644
index 0000000..71b8f1b
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java
@@ -0,0 +1,98 @@
+package org.apache.samoa.core;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+
+import org.apache.samoa.core.DoubleVector;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DoubleVectorTest {
+  private DoubleVector emptyVector, array5Vector;
+
+  @Before
+  public void setUp() {
+    emptyVector = new DoubleVector();
+    array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 });
+  }
+
+  @Test
+  public void testGetArrayRef() {
+    assertThat(emptyVector.getArrayRef(), notNullValue());
+    assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef());
+    assertEquals(5, array5Vector.getArrayRef().length);
+  }
+
+  @Test
+  public void testGetArrayCopy() {
+    double[] arrayRef;
+    arrayRef = emptyVector.getArrayRef();
+    assertTrue(arrayRef != emptyVector.getArrayCopy());
+    assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy())));
+
+    arrayRef = array5Vector.getArrayRef();
+    assertTrue(arrayRef != array5Vector.getArrayCopy());
+    assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy())));
+  }
+
+  @Test
+  public void testNumNonZeroEntries() {
+    assertEquals(0, emptyVector.numNonZeroEntries());
+    assertEquals(3, array5Vector.numNonZeroEntries());
+  }
+
+  @Test(expected = IndexOutOfBoundsException.class)
+  public void testGetValueOutOfBound() {
+    @SuppressWarnings("unused")
+    double value = emptyVector.getArrayRef()[0];
+  }
+
+  @Test()
+  public void testSetValue() {
+    // test automatic vector enlargement
+    emptyVector.setValue(0, 1.0);
+    assertEquals(1, emptyVector.getArrayRef().length);
+    assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly 
the same, so delta=0.0
+
+    emptyVector.setValue(5, 5.5);
+    assertEquals(6, emptyVector.getArrayRef().length);
+    assertEquals(2, emptyVector.numNonZeroEntries());
+    assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly 
the same, so delta=0.0
+  }
+
+  @Test
+  public void testAddToValue() {
+    array5Vector.addToValue(2, 5.0);
+    assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly 
the same, so delta=0.0
+
+    // test automatic vector enlargement
+    emptyVector.addToValue(0, 1.0);
+    assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly 
the same, so delta=0.0
+  }
+
+  @Test
+  public void testSumOfValues() {
+    assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), 
Double.MIN_NORMAL);
+  }
+
+}

Reply via email to