http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
new file mode 100644
index 0000000..e9a5aa1
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
@@ -0,0 +1,229 @@
+package com.yahoo.labs.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.learners.InstanceContentEvent;
+import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+
+/**
+ * Prequential Source Processor is the processor for Prequential Evaluation 
Task.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public final class PrequentialSourceProcessor implements EntranceProcessor {
+
+    private static final long serialVersionUID = 4169053337917578558L;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PrequentialSourceProcessor.class);
+    private boolean isInited = false;
+    private StreamSource streamSource;
+    private Instance firstInstance;
+    private int numberInstances;
+    private int numInstanceSent = 0;
+
+    protected InstanceStream sourceStream;
+    
+    /*
+        * ScheduledExecutorService to schedule sending events after each delay 
interval.
+        * It is expected to have only one event in the queue at a time, so we 
need only 
+        * one thread in the pool.
+        */
+       private transient ScheduledExecutorService timer;
+       private transient ScheduledFuture<?> schedule = null;
+       private int readyEventIndex = 1; // No waiting for the first event
+       private int delay = 0;
+       private int batchSize = 1;
+    private boolean finished = false;
+
+    @Override
+    public boolean process(ContentEvent event) {
+        // TODO: possible refactor of the super-interface implementation
+        // of source processor does not need this method
+        return false;
+    }
+    
+    @Override
+    public boolean isFinished() {
+       return finished;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !isFinished() && (delay <= 0 || numInstanceSent < 
readyEventIndex);
+    }
+
+    private boolean hasReachedEndOfStream() {
+        return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
+    }
+
+    @Override
+    public ContentEvent nextEvent() {
+        InstanceContentEvent contentEvent = null;
+        if (hasReachedEndOfStream()) {
+               contentEvent = new InstanceContentEvent(-1, firstInstance, 
false, true);
+            contentEvent.setLast(true);
+            // set finished status _after_ tagging last event
+            finished = true;
+        }
+        else if (hasNext()) {
+            numInstanceSent++;
+            contentEvent = new InstanceContentEvent(numInstanceSent, 
nextInstance(), true, true);
+            
+            // first call to this method will trigger the timer
+            if (schedule == null && delay > 0) {
+                schedule = timer.scheduleWithFixedDelay(new 
DelayTimeoutHandler(this), delay, delay,
+                        TimeUnit.MICROSECONDS);
+            }
+        }
+        return contentEvent;
+    }
+    
+       private void increaseReadyEventIndex() {
+               readyEventIndex+=batchSize;
+               // if we exceed the max, cancel the timer
+               if (schedule != null && isFinished()) {
+                       schedule.cancel(false);
+               }
+       }
+
+    @Override
+    public void onCreate(int id) {
+        initStreamSource(sourceStream);
+        timer = Executors.newScheduledThreadPool(1);
+        logger.debug("Creating PrequentialSourceProcessor with id {}", id);
+    }
+
+    @Override
+    public Processor newProcessor(Processor p) {
+        PrequentialSourceProcessor newProcessor = new 
PrequentialSourceProcessor();
+        PrequentialSourceProcessor originProcessor = 
(PrequentialSourceProcessor) p;
+        if (originProcessor.getStreamSource() != null) {
+            
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
+        }
+        return newProcessor;
+    }
+
+//    /**
+//     * Method to send instances via input stream
+//     * 
+//     * @param inputStream
+//     * @param numberInstances
+//     */
+//    public void sendInstances(Stream inputStream, int numberInstances) {
+//        int numInstanceSent = 0;
+//        initStreamSource(sourceStream);
+//
+//        while (streamSource.hasMoreInstances() && numInstanceSent < 
numberInstances) {
+//            numInstanceSent++;
+//            InstanceContentEvent contentEvent = new 
InstanceContentEvent(numInstanceSent, nextInstance(), true, true);
+//            inputStream.put(contentEvent);
+//        }
+//
+//        sendEndEvaluationInstance(inputStream);
+//    }
+
+    public StreamSource getStreamSource() {
+        return streamSource;
+    }
+
+    public void setStreamSource(InstanceStream stream) {
+        this.sourceStream = stream;
+    }
+
+    public Instances getDataset() {
+        if (firstInstance == null) {
+            initStreamSource(sourceStream);
+        }
+        return firstInstance.dataset();
+    }
+
+    private Instance nextInstance() {
+        if (this.isInited) {
+            return streamSource.nextInstance().getData();
+        } else {
+            this.isInited = true;
+            return firstInstance;
+        }
+    }
+
+//    private void sendEndEvaluationInstance(Stream inputStream) {
+//        InstanceContentEvent contentEvent = new InstanceContentEvent(-1, 
firstInstance, false, true);
+//        contentEvent.setLast(true);
+//        inputStream.put(contentEvent);
+//    }
+
+    private void initStreamSource(InstanceStream stream) {
+        if (stream instanceof AbstractOptionHandler) {
+            ((AbstractOptionHandler) (stream)).prepareForUse();
+        }
+
+        this.streamSource = new StreamSource(stream);
+        firstInstance = streamSource.nextInstance().getData();
+    }
+
+    public void setMaxNumInstances(int value) {
+        numberInstances = value;
+    }
+    
+    public int getMaxNumInstances() {
+       return this.numberInstances;
+    }
+    
+       public void setSourceDelay(int delay) {
+               this.delay = delay;
+       }
+
+       public int getSourceDelay() {
+               return this.delay;
+       }
+       
+       public void setDelayBatchSize(int batch) {
+               this.batchSize = batch;
+       }
+       
+       private class DelayTimeoutHandler implements Runnable {
+       
+       private PrequentialSourceProcessor processor;
+       
+       public DelayTimeoutHandler(PrequentialSourceProcessor processor) {
+               this.processor = processor;
+       }
+       
+       public void run() {
+               processor.increaseReadyEventIndex();
+       }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
new file mode 100644
index 0000000..453d02d
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
@@ -0,0 +1,90 @@
+package com.yahoo.labs.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.yahoo.labs.samoa.moa.core.Example;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.instances.Instance;
+
+/**
+ * The Class StreamSource.
+ */
+public class StreamSource implements java.io.Serializable{
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = 3974668694861231236L;
+
+       /**
+        * Instantiates a new stream source.
+        *
+        * @param stream the stream
+        */
+       public StreamSource(InstanceStream stream) {
+               super();
+               this.stream = stream;
+       }
+
+       /** The stream. */
+       protected InstanceStream stream;
+
+       /**
+        * Gets the stream.
+        *
+        * @return the stream
+        */
+       public InstanceStream getStream() {
+               return stream;
+       }
+
+       /**
+        * Next instance.
+        *
+        * @return the instance
+        */
+       public Example<Instance> nextInstance() {
+               return stream.nextInstance();
+       }
+
+       /**
+        * Sets the stream.
+        *
+        * @param stream the new stream
+        */
+       public void setStream(InstanceStream stream) {
+               this.stream = stream;
+       }
+
+       /**
+        * Checks for more instances.
+        *
+        * @return true, if successful
+        */
+       public boolean hasMoreInstances() {
+               return this.stream.hasMoreInstances();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
new file mode 100644
index 0000000..2e66e4b
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
@@ -0,0 +1,185 @@
+package com.yahoo.labs.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.learners.InstanceContentEvent;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.topology.Stream;
+
+/**
+ * The Class StreamSourceProcessor.
+ */
+public class StreamSourceProcessor implements Processor {
+       
+       /** The Constant logger. */
+       private static final Logger logger = LoggerFactory
+                       .getLogger(StreamSourceProcessor.class);
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -204182279475432739L;
+
+       /** The stream source. */
+       private StreamSource streamSource;
+
+       /**
+        * Gets the stream source.
+        *
+        * @return the stream source
+        */
+       public StreamSource getStreamSource() {
+               return streamSource;
+       }
+
+       /**
+        * Sets the stream source.
+        *
+        * @param stream the new stream source
+        */
+       public void setStreamSource(InstanceStream stream) {
+               this.streamSource = new StreamSource(stream);
+               firstInstance = streamSource.nextInstance().getData();
+       }
+
+       /** The number instances sent. */
+       private long numberInstancesSent = 0;
+
+       /**
+        * Send instances.
+        *  @param inputStream the input stream
+        * @param numberInstances the number instances
+        * @param isTraining the is training
+        */
+       public void sendInstances(Stream inputStream,
+                                                                               
                                int numberInstances, boolean isTraining, 
boolean isTesting) {
+               int numberSamples = 0;
+
+               while (streamSource.hasMoreInstances()
+                               && numberSamples < numberInstances) {
+                       
+                       numberSamples++;
+                       numberInstancesSent++;
+                       InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
+                                       numberInstancesSent, nextInstance(), 
isTraining, isTesting);
+               
+                       
+                       inputStream.put(instanceContentEvent);
+               }
+
+               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
+                               numberInstancesSent, null, isTraining, 
isTesting);
+               instanceContentEvent.setLast(true);
+               inputStream.put(instanceContentEvent);
+       }
+
+       /**
+        * Send end evaluation instance.
+        *
+        * @param inputStream the input stream
+        */
+       public void sendEndEvaluationInstance(Stream inputStream) {
+               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(-1, firstInstance,false, true);
+               inputStream.put(instanceContentEvent);
+       }
+
+       /**
+        * Next instance.
+        *
+        * @return the instance
+        */
+       protected Instance nextInstance() {
+               if (this.isInited) {
+                       return streamSource.nextInstance().getData();
+               } else {
+                       this.isInited = true;
+                       return firstInstance;
+               }
+       }
+
+       /** The is inited. */
+       protected boolean isInited = false;
+       
+       /** The first instance. */
+       protected Instance firstInstance;
+
+       //@Override
+       /**
+        * On remove.
+        */
+       protected void onRemove() {
+       }
+
+       /* (non-Javadoc)
+        * @see samoa.core.Processor#onCreate(int)
+        */
+       @Override
+       public void onCreate(int id) {
+               // TODO Auto-generated method stub
+       }
+
+       /* (non-Javadoc)
+        * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+        */
+       @Override
+       public Processor newProcessor(Processor sourceProcessor) {
+//             StreamSourceProcessor newProcessor = new 
StreamSourceProcessor();
+//             StreamSourceProcessor originProcessor = (StreamSourceProcessor) 
sourceProcessor;
+//             if (originProcessor.getStreamSource() != null){
+//                     
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
+//             }
+               //return newProcessor;
+               return null;
+       }
+
+       /**
+        * On event.
+        *
+        * @param event the event
+        * @return true, if successful
+        */
+       @Override
+       public boolean process(ContentEvent event) {
+               return false;
+       }
+       
+       
+       /**
+        * Gets the dataset.
+        *
+        * @return the dataset
+        */
+       public Instances getDataset() {
+               return firstInstance.dataset();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
new file mode 100644
index 0000000..25541e2
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
@@ -0,0 +1,67 @@
+package com.yahoo.labs.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An interface for FileStream's source (Local FS, HDFS,...)
+ * @author Casey
+ */
+public interface FileStreamSource extends Serializable {
+
+       /**
+        * Init the source with file/directory path and file extension
+        * @param path
+        *            File or directory path
+        * @param ext
+        *            File extension to be used to filter files in a directory. 
+        *            If null, all files in the directory are accepted.
+        */
+       public void init(String path, String ext);
+       
+       /**
+        * Reset the source
+        */
+       public void reset() throws IOException;
+       
+       /**
+        * Retrieve InputStream for next file.
+        * This method will return null if we are at the last file 
+        * in the list.
+        * 
+        * @return InputStream for next file in the list
+        */
+       public InputStream getNextInputStream();
+       
+       /**
+        * Retrieve InputStream for current file.
+        * The "current pointer" is moved forward
+        * with getNextInputStream method. So if there was no
+        * invocation of getNextInputStream, this method will
+        * return null.
+        * 
+        * @return InputStream for current file in the list
+        */
+       public InputStream getCurrentInputStream();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
new file mode 100644
index 0000000..079423c
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
@@ -0,0 +1,150 @@
+package com.yahoo.labs.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Source for FileStream for HDFS files
+ * @author Casey
+ */
+public class HDFSFileStreamSource implements FileStreamSource {
+       
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -3887354805787167400L;
+       
+       private transient InputStream fileStream;
+    private transient Configuration config;
+    private List<String> filePaths;
+    private int currentIndex;
+       
+       public HDFSFileStreamSource(){
+               this.currentIndex = -1;
+       }
+       
+       public void init(String path, String ext) {
+               this.init(this.getDefaultConfig(), path, ext);
+       }
+       
+       public void init(Configuration config, String path, String ext) {
+               this.config = config;
+               this.filePaths = new ArrayList<String>();
+               Path hdfsPath = new Path(path);
+        FileSystem fs;
+        try {
+               fs = FileSystem.get(config);
+               FileStatus fileStat = fs.getFileStatus(hdfsPath);
+               if (fileStat.isDirectory()) {
+                       Path filterPath = hdfsPath;
+                       if (ext != null) {
+                               filterPath = new Path(path.toString(),"*."+ext);
+                       }
+                       else {
+                               filterPath = new Path(path.toString(),"*");
+                       }
+                       FileStatus[] filesInDir = fs.globStatus(filterPath);
+                       for (int i=0; i<filesInDir.length; i++) {
+                               if (filesInDir[i].isFile()) {
+                                       
filePaths.add(filesInDir[i].getPath().toString());
+                               }
+                       }
+               }
+               else {
+                       this.filePaths.add(path);
+               }
+        }
+        catch(IOException ioe) {
+            throw new RuntimeException("Failed getting list of files 
at:"+path,ioe);
+        }
+        
+               this.currentIndex = -1;
+       }
+       
+       private Configuration getDefaultConfig() {
+               String hadoopHome = System.getenv("HADOOP_HOME");
+        Configuration conf = new Configuration();
+        if (hadoopHome != null) {
+               java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml");
+               java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml");
+            conf.addResource(new 
Path(coreSitePath.toAbsolutePath().toString()));
+            conf.addResource(new 
Path(hdfsSitePath.toAbsolutePath().toString()));
+        }
+        return conf;
+       }
+       
+       public void reset() throws IOException {
+               this.currentIndex = -1;
+               this.closeFileStream();
+       }
+
+       private void closeFileStream() {
+        IOUtils.closeStream(fileStream);
+       }
+
+       public InputStream getNextInputStream() {
+               this.closeFileStream();
+               if (this.currentIndex >= (this.filePaths.size()-1)) return null;
+               
+               this.currentIndex++;
+               String filePath = this.filePaths.get(currentIndex);
+               
+               Path hdfsPath = new Path(filePath);
+        FileSystem fs;
+        try {
+               fs = FileSystem.get(config);
+            fileStream = fs.open(hdfsPath);
+        }
+        catch(IOException ioe) {
+            this.closeFileStream();
+            throw new RuntimeException("Failed opening file:"+filePath,ioe);
+        }
+        
+        return fileStream;
+       }
+
+       public InputStream getCurrentInputStream() {
+               return fileStream;
+       }
+       
+       protected int getFilePathListSize() {
+               if (filePaths != null)
+                       return filePaths.size();
+               return 0;
+       }
+       
+       protected String getFilePathAt(int index) {
+               if (filePaths != null && filePaths.size() > index)
+                       return filePaths.get(index);
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
new file mode 100644
index 0000000..c0ab44f
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
@@ -0,0 +1,131 @@
+package com.yahoo.labs.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Source for FileStream for local files
+ * @author Casey
+ */
+public class LocalFileStreamSource implements FileStreamSource {
+       /**
+        * 
+        */
+       private static final long serialVersionUID = 3986511547525870698L;
+       
+       private transient InputStream fileStream;
+    private List<String> filePaths;
+    private int currentIndex;
+       
+       public LocalFileStreamSource(){
+               this.currentIndex = -1;
+       }
+       
+       public void init(String path, String ext) {
+               this.filePaths = new ArrayList<String>();
+               File fileAtPath = new File(path);
+               if (fileAtPath.isDirectory()) {
+                       File[] filesInDir = fileAtPath.listFiles(new 
FileExtensionFilter(ext));
+                       for (int i=0; i<filesInDir.length; i++) {
+                               filePaths.add(filesInDir[i].getAbsolutePath());
+                       }
+               }
+               else {
+                       this.filePaths.add(path);
+               }
+               this.currentIndex = -1;
+       }
+       
+       public void reset() throws IOException {
+               this.currentIndex = -1;
+               this.closeFileStream();
+       }
+       
+       private void closeFileStream() {
+               if (fileStream != null) {
+                       try {
+                               fileStream.close();
+                       } catch (IOException ioe) {
+                               ioe.printStackTrace();
+                       }
+               }
+       }
+
+       public InputStream getNextInputStream() {
+               this.closeFileStream();
+               
+               if (this.currentIndex >= (this.filePaths.size()-1)) return null;
+               
+               this.currentIndex++;
+               String filePath = this.filePaths.get(currentIndex);
+               
+               File file = new File(filePath);
+        try {
+               fileStream = new FileInputStream(file);
+        }
+        catch(IOException ioe) {
+            this.closeFileStream();
+            throw new RuntimeException("Failed opening file:"+filePath,ioe);
+        }
+        
+        return fileStream;
+       }
+
+       public InputStream getCurrentInputStream() {
+               return fileStream;
+       }
+       
+       protected int getFilePathListSize() {
+               if (filePaths != null)
+                       return filePaths.size();
+               return 0;
+       }
+       
+       protected String getFilePathAt(int index) {
+               if (filePaths != null && filePaths.size() > index)
+                       return filePaths.get(index);
+               return null;
+       }
+       
+       private class FileExtensionFilter implements FilenameFilter {
+               private String extension;
+               FileExtensionFilter(String ext) {
+                       extension = ext;
+               }
+               
+               @Override
+               public boolean accept(File dir, String name) {
+                       File f = new File(dir,name);
+                       if (extension == null)
+                               return f.isFile();
+                       else
+                               return f.isFile() && 
name.toLowerCase().endsWith("."+extension);
+           }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
new file mode 100644
index 0000000..4af3764
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
@@ -0,0 +1,174 @@
+package com.yahoo.labs.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+import com.yahoo.labs.samoa.evaluation.ClusteringEvaluatorProcessor;
+import com.yahoo.labs.samoa.learners.Learner;
+import 
com.yahoo.labs.samoa.learners.clusterers.simple.ClusteringDistributorProcessor;
+import com.yahoo.labs.samoa.learners.clusterers.simple.DistributedClusterer;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream;
+import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents;
+import com.yahoo.labs.samoa.streams.ClusteringEntranceProcessor;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+
+/**
+ * A task that runs and evaluates a distributed clustering algorithm.
+ * 
+ */
+public class ClusteringEvaluation implements Task, Configurable {
+
+    private static final long serialVersionUID = -8246537378371580550L;
+
+    private static final int DISTRIBUTOR_PARALLELISM = 1;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEvaluation.class);
+
+    public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Clustering to run.", Learner.class, DistributedClusterer.class.getName());
+
+    public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', 
"Input stream.", InstanceStream.class,
+            RandomRBFGeneratorEvents.class.getName());
+
+    public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', 
"Maximum number of instances to test/train on  (-1 = no limit).", 100000, -1,
+            Integer.MAX_VALUE);
+
+    public IntOption measureCollectionTypeOption = new 
IntOption("measureCollectionType", 'm', "Type of measure collection", 0, 0, 
Integer.MAX_VALUE);
+
+    public IntOption timeLimitOption = new IntOption("timeLimit", 't', 
"Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
+            Integer.MAX_VALUE);
+
+    public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f', "How many instances between samples of the learning performance.", 1000, 0,
+            Integer.MAX_VALUE);
+
+    public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation", 
"Clustering__"
+            + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+    public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File 
to append intermediate csv results to", null, "csv", true);
+
+    public FloatOption samplingThresholdOption = new 
FloatOption("samplingThreshold", 'a', "Ratio of instances sampled that will be 
used for evaluation.", 0.5,
+            0.0, 1.0);
+
+    private ClusteringEntranceProcessor source;
+    private InstanceStream streamTrain;
+    private ClusteringDistributorProcessor distributor;
+    private Stream distributorStream;
+    private Stream evaluationStream;
+    
+    // Default=0: no delay/waiting
+    public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', 
"How many miliseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
+    
+    private Learner learner;
+    private ClusteringEvaluatorProcessor evaluator;
+
+    private Topology topology;
+    private TopologyBuilder builder;
+
+    public void getDescription(StringBuilder sb) {
+        sb.append("Clustering evaluation");
+    }
+
+    @Override
+    public void init() {
+        // TODO remove the if statement theoretically, dynamic binding will 
work here! for now, the if statement is used by Storm
+
+        if (builder == null) {
+            logger.warn("Builder was not initialized, initializing it from the 
Task");
+
+            builder = new TopologyBuilder();
+            logger.debug("Successfully instantiating TopologyBuilder");
+
+            builder.initTopology(evaluationNameOption.getValue(), 
sourceDelayOption.getValue());
+            logger.debug("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
+        }
+
+        // instantiate ClusteringEntranceProcessor and its output stream 
(sourceStream)
+        source = new ClusteringEntranceProcessor();
+        streamTrain = this.streamTrainOption.getValue();
+        source.setStreamSource(streamTrain);
+        builder.addEntranceProcessor(source);
+        source.setSamplingThreshold(samplingThresholdOption.getValue());
+        source.setMaxNumInstances(instanceLimitOption.getValue());
+        logger.debug("Successfully instantiated ClusteringEntranceProcessor");
+
+        Stream sourceStream = builder.createStream(source);
+        // starter.setInputStream(sourcePiOutputStream); // FIXME set stream 
in the EntrancePI
+
+        // distribution of instances and sampling for evaluation
+        distributor = new ClusteringDistributorProcessor();
+        builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM);
+        builder.connectInputShuffleStream(sourceStream, distributor);
+        distributorStream = builder.createStream(distributor);
+        distributor.setOutputStream(distributorStream);
+        evaluationStream = builder.createStream(distributor);
+        distributor.setEvaluationStream(evaluationStream); // passes 
evaluation events along
+        logger.debug("Successfully instantiated Distributor");
+       
+        // instantiate learner and connect it to distributorStream
+        learner = this.learnerOption.getValue();
+        learner.init(builder, source.getDataset(), 1);
+        builder.connectInputShuffleStream(distributorStream, 
learner.getInputProcessor());
+        logger.debug("Successfully instantiated Learner");
+
+        evaluator = new ClusteringEvaluatorProcessor.Builder(
+        sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile())
+            .decayHorizon(((ClusteringStream) 
streamTrain).getDecayHorizon()).build();
+
+        builder.addProcessor(evaluator);
+        for (Stream evaluatorPiInputStream:learner.getResultStreams()) {
+               builder.connectInputShuffleStream(evaluatorPiInputStream, 
evaluator);
+        }
+        builder.connectInputAllStream(evaluationStream, evaluator);
+        logger.debug("Successfully instantiated EvaluatorProcessor");
+
+        topology = builder.build();
+        logger.debug("Successfully built the topology");
+    }
+
+    @Override
+    public void setFactory(ComponentFactory factory) {
+        // TODO unify this code with init() for now, it's used by S4 App
+        // dynamic binding theoretically will solve this problem
+        builder = new TopologyBuilder(factory);
+        logger.debug("Successfully instantiated TopologyBuilder");
+
+        builder.initTopology(evaluationNameOption.getValue());
+        logger.debug("Successfully initialized SAMOA topology with name {}", 
evaluationNameOption.getValue());
+
+    }
+
+    public Topology getTopology() {
+        return topology;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
new file mode 100644
index 0000000..70c44a1
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
@@ -0,0 +1,206 @@
+package com.yahoo.labs.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+import com.yahoo.labs.samoa.evaluation.BasicClassificationPerformanceEvaluator;
+import com.yahoo.labs.samoa.evaluation.BasicRegressionPerformanceEvaluator;
+import com.yahoo.labs.samoa.evaluation.ClassificationPerformanceEvaluator;
+import com.yahoo.labs.samoa.evaluation.PerformanceEvaluator;
+import com.yahoo.labs.samoa.evaluation.EvaluatorProcessor;
+import com.yahoo.labs.samoa.evaluation.RegressionPerformanceEvaluator;
+import com.yahoo.labs.samoa.learners.ClassificationLearner;
+import com.yahoo.labs.samoa.learners.Learner;
+import com.yahoo.labs.samoa.learners.RegressionLearner;
+import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator;
+import com.yahoo.labs.samoa.streams.PrequentialSourceProcessor;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+
+/**
+ * Prequential Evaluation task is a scheme in evaluating performance of online 
classifiers which uses each instance for testing online classifiers model and
+ * then it further uses the same instance for training the 
model(Test-then-train)
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class PrequentialEvaluation implements Task, Configurable {
+
+    private static final long serialVersionUID = -8246537378371580550L;
+
+    private static Logger logger = 
LoggerFactory.getLogger(PrequentialEvaluation.class);
+
+    public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName());
+
+    public ClassOption streamTrainOption = new ClassOption("trainStream", 's', 
"Stream to learn from.", InstanceStream.class,
+            RandomTreeGenerator.class.getName());
+
+    public ClassOption evaluatorOption = new ClassOption("evaluator", 'e', 
"Classification performance evaluation method.",
+            PerformanceEvaluator.class, 
BasicClassificationPerformanceEvaluator.class.getName());
+
+    public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', 
"Maximum number of instances to test/train on  (-1 = no limit).", 1000000, -1,
+            Integer.MAX_VALUE);
+
+    public IntOption timeLimitOption = new IntOption("timeLimit", 't', 
"Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
+            Integer.MAX_VALUE);
+
+    public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f', "How many instances between samples of the learning performance.", 100000,
+            0, Integer.MAX_VALUE);
+
+    public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation", 
"Prequential_"
+            + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+    public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File 
to append intermediate csv results to", null, "csv", true);
+
+    // Default=0: no delay/waiting
+    public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', 
"How many microseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
+    // Batch size to delay the incoming stream: delay of x milliseconds after 
each batch
+    public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b', 
"The delay batch size: delay of x milliseconds after each batch ", 1, 1, 
Integer.MAX_VALUE);
+    
+    private PrequentialSourceProcessor preqSource;
+
+    // private PrequentialSourceTopologyStarter preqStarter;
+
+    // private EntranceProcessingItem sourcePi;
+
+    private Stream sourcePiOutputStream;
+
+    private Learner classifier;
+
+    private EvaluatorProcessor evaluator;
+
+    // private ProcessingItem evaluatorPi;
+
+    // private Stream evaluatorPiInputStream;
+
+    private Topology prequentialTopology;
+
+    private TopologyBuilder builder;
+
+    public void getDescription(StringBuilder sb, int indent) {
+        sb.append("Prequential evaluation");
+    }
+
+    @Override
+    public void init() {
+        // TODO remove the if statement
+        // theoretically, dynamic binding will work here!
+        // test later!
+        // for now, the if statement is used by Storm
+
+        if (builder == null) {
+            builder = new TopologyBuilder();
+            logger.debug("Successfully instantiating TopologyBuilder");
+
+            builder.initTopology(evaluationNameOption.getValue());
+            logger.debug("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
+        }
+
+        // instantiate PrequentialSourceProcessor and its output stream 
(sourcePiOutputStream)
+        preqSource = new PrequentialSourceProcessor();
+        preqSource.setStreamSource((InstanceStream) 
this.streamTrainOption.getValue());
+        preqSource.setMaxNumInstances(instanceLimitOption.getValue());
+        preqSource.setSourceDelay(sourceDelayOption.getValue());
+        preqSource.setDelayBatchSize(batchDelayOption.getValue());
+        builder.addEntranceProcessor(preqSource);
+        logger.debug("Successfully instantiating PrequentialSourceProcessor");
+
+        // preqStarter = new PrequentialSourceTopologyStarter(preqSource, 
instanceLimitOption.getValue());
+        // sourcePi = builder.createEntrancePi(preqSource, preqStarter);
+        // sourcePiOutputStream = builder.createStream(sourcePi);
+
+        sourcePiOutputStream = builder.createStream(preqSource);
+        // preqStarter.setInputStream(sourcePiOutputStream);
+
+        // instantiate classifier and connect it to sourcePiOutputStream
+        classifier = this.learnerOption.getValue();
+        classifier.init(builder, preqSource.getDataset(), 1);
+        builder.connectInputShuffleStream(sourcePiOutputStream, 
classifier.getInputProcessor());
+        logger.debug("Successfully instantiating Classifier");
+
+        PerformanceEvaluator evaluatorOptionValue = 
this.evaluatorOption.getValue();
+        if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, 
evaluatorOptionValue)) {
+               evaluatorOptionValue = 
getDefaultPerformanceEvaluatorForLearner(classifier);
+        }
+        evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue)
+                
.samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build();
+
+        // evaluatorPi = builder.createPi(evaluator);
+        // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream);
+        builder.addProcessor(evaluator);
+        for (Stream evaluatorPiInputStream:classifier.getResultStreams()) {
+               builder.connectInputShuffleStream(evaluatorPiInputStream, 
evaluator);
+        }
+        
+        logger.debug("Successfully instantiating EvaluatorProcessor");
+
+        prequentialTopology = builder.build();
+        logger.debug("Successfully building the topology");
+    }
+
+    @Override
+    public void setFactory(ComponentFactory factory) {
+        // TODO unify this code with init()
+        // for now, it's used by S4 App
+        // dynamic binding theoretically will solve this problem
+        builder = new TopologyBuilder(factory);
+        logger.debug("Successfully instantiating TopologyBuilder");
+
+        builder.initTopology(evaluationNameOption.getValue());
+        logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+
+    }
+
+    public Topology getTopology() {
+        return prequentialTopology;
+    }
+    //
+    // @Override
+    // public TopologyStarter getTopologyStarter() {
+    // return this.preqStarter;
+    // }
+    
+    private static boolean isLearnerAndEvaluatorCompatible(Learner learner, 
PerformanceEvaluator evaluator) {
+        return (learner instanceof RegressionLearner && evaluator instanceof 
RegressionPerformanceEvaluator) ||
+            (learner instanceof ClassificationLearner && evaluator instanceof 
ClassificationPerformanceEvaluator);
+    }
+    
+    private static PerformanceEvaluator 
getDefaultPerformanceEvaluatorForLearner(Learner learner) {
+       if (learner instanceof RegressionLearner) {
+               return new BasicRegressionPerformanceEvaluator();
+       }
+       // Default to BasicClassificationPerformanceEvaluator for all other 
cases
+       return new BasicClassificationPerformanceEvaluator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
new file mode 100644
index 0000000..41b47e4
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
@@ -0,0 +1,61 @@
+package com.yahoo.labs.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.Topology;
+
+/**
+ * Task interface, the mother of all SAMOA tasks!
+ */
+public interface Task {
+
+       /**
+        * Initialize this SAMOA task, 
+        * i.e. create and connect ProcessingItems and Streams
+        * and initialize the topology
+        */
+       public void init();     
+       
+       /**
+        * Return the final topology object to be executed in the cluster
+        * @return topology object to be submitted to be executed in the cluster
+        */
+       public Topology getTopology();
+       
+    // /**
+    // * Return the entrance processor to start SAMOA topology
+    // * The logic to start the topology should be implemented here
+    // * @return entrance processor to start the topology
+    // */
+    // public TopologyStarter getTopologyStarter();
+       
+       /**
+        * Sets the factory.
+        * TODO: propose to hide factory from task, 
+        * i.e. Task will only see TopologyBuilder, 
+        * and factory creation will be handled by TopologyBuilder
+        *
+        * @param factory the new factory
+        */
+       public void setFactory(ComponentFactory factory) ;
+       
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
new file mode 100644
index 0000000..c0f0cc3
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
@@ -0,0 +1,108 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+
+/**
+ * Helper class for EntranceProcessingItem implementation.
+ * @author Anh Thu Vu
+ *
+ */
+public abstract class AbstractEntranceProcessingItem implements 
EntranceProcessingItem {
+       private EntranceProcessor processor;
+       private String name;
+       private Stream outputStream;
+       
+       /*
+        * Constructor
+        */
+       public AbstractEntranceProcessingItem() {
+               this(null);
+       }
+       public AbstractEntranceProcessingItem(EntranceProcessor processor) {
+               this.processor = processor;
+       }
+       
+       /*
+        * Processor
+        */
+       /**
+        * Set the entrance processor for this EntranceProcessingItem
+        * @param processor
+        *                      the processor
+        */
+       protected void setProcessor(EntranceProcessor processor) {
+               this.processor = processor;
+       }
+       
+       /**
+        * Get the EntranceProcessor of this EntranceProcessingItem.
+        * @return the EntranceProcessor
+        */
+       public EntranceProcessor getProcessor() {
+               return this.processor;
+       }
+       
+       /*
+        * Name/ID
+        */
+       /**
+        * Set the name (or ID) of this EntranceProcessingItem
+        * @param name
+        */
+       public void setName(String name) {
+               this.name = name;
+       }
+       
+       /**
+        * Get the name (or ID) of this EntranceProcessingItem
+        * @return the name (or ID)
+        */
+       public String getName() {
+               return this.name;
+       }
+       
+       /*
+        * Output Stream
+        */
+       /**
+        * Set the output stream of this EntranceProcessingItem.
+        * An EntranceProcessingItem should have only 1 single output stream and
+        * should not be re-assigned.
+        * @return this EntranceProcessingItem
+        */
+       public EntranceProcessingItem setOutputStream(Stream outputStream) {
+               if (this.outputStream != null && this.outputStream != 
outputStream) {
+                       throw new IllegalStateException("Cannot overwrite 
output stream of EntranceProcessingItem");
+               } else 
+                       this.outputStream = outputStream;
+               return this;
+       }
+       
+       /**
+        * Get the output stream of this EntranceProcessingItem.
+        * @return the output stream
+        */
+       public Stream getOutputStream() {
+               return this.outputStream;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
new file mode 100644
index 0000000..d0f04f7
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
@@ -0,0 +1,161 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+
+/**
+ * Abstract ProcessingItem
+ * 
+ * Helper for implementation of ProcessingItem. It has basic information
+ * for a ProcessingItem: name, parallelismLevel and a processor.
+ * Subclass of this class needs to implement {@link #addInputStream(Stream, 
PartitioningScheme)}.
+ * 
+ * @author Anh Thu Vu
+ *
+ */
+public abstract class AbstractProcessingItem implements ProcessingItem {
+       private String name;
+       private int parallelism;
+       private Processor processor;
+       
+       /*
+        * Constructor
+        */
+       public AbstractProcessingItem() {
+               this(null);
+       }
+       public AbstractProcessingItem(Processor processor) {
+               this(processor,1);
+       }
+       public AbstractProcessingItem(Processor processor, int parallelism) {
+               this.processor = processor;
+               this.parallelism = parallelism;
+       }
+       
+       /*
+        * Processor
+        */
+       /**
+        * Set the processor for this ProcessingItem
+        * @param processor
+        *                      the processor
+        */
+       protected void setProcessor(Processor processor) {
+               this.processor = processor;
+       }
+       
+       /**
+        * Get the processor of this ProcessingItem
+        * @return the processor
+        */
+       public Processor getProcessor() {
+               return this.processor;
+       }
+       
+       /*
+        * Parallelism 
+        */
+       /**
+        * Set the parallelism factor of this ProcessingItem
+        * @param parallelism
+        */
+       protected void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+       }
+       
+       /**
+        * Get the parallelism factor of this ProcessingItem
+        * @return the parallelism factor
+        */
+       @Override
+       public int getParallelism() {
+               return this.parallelism;
+       }
+       
+       /*
+        * Name/ID
+        */
+       /**
+        * Set the name (or ID) of this ProcessingItem
+        * @param name
+        *                      the name/ID
+        */
+       public void setName(String name) {
+               this.name = name;
+       }
+       
+       /**
+        * Get the name (or ID) of this ProcessingItem
+        * @return the name/ID
+        */
+       public String getName() {
+               return this.name;
+       }
+       
+       /*
+        * Add input streams
+        */
+       /**
+        * Add an input stream to this ProcessingItem
+        * 
+        * @param inputStream
+        *                      the input stream to add
+        * @param scheme
+        *                      partitioning scheme associated with this 
ProcessingItem and the input stream
+        * @return this ProcessingItem
+        */
+       protected abstract ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme);
+
+       /**
+        * Add an input stream to this ProcessingItem with SHUFFLE scheme
+        * 
+        * @param inputStream
+        *                      the input stream
+        * @return this ProcessingItem
+        */
+    public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+       return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE);
+    }
+
+    /**
+        * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme
+        * 
+        * @param inputStream
+        *                      the input stream
+        * @return this ProcessingItem
+        */
+    public ProcessingItem connectInputKeyStream(Stream inputStream) {
+       return this.addInputStream(inputStream, 
PartitioningScheme.GROUP_BY_KEY);
+    }
+
+    /**
+        * Add an input stream to this ProcessingItem with BROADCAST scheme
+        * 
+        * @param inputStream
+        *                      the input stream
+        * @return this ProcessingItem
+        */
+    public ProcessingItem connectInputAllStream(Stream inputStream) {
+       return this.addInputStream(inputStream, PartitioningScheme.BROADCAST);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
new file mode 100644
index 0000000..b3544ed
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
@@ -0,0 +1,115 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+
+/**
+ * Abstract Stream
+ * 
+ * Helper for implementation of Stream. It has basic information
+ * for a Stream: streamID and source ProcessingItem.
+ * Subclass of this class needs to implement {@link #put(ContentEvent)}.
+ * 
+ * @author Anh Thu Vu
+ *
+ */
+
+public abstract class AbstractStream implements Stream {
+       private String streamID;
+       private IProcessingItem sourcePi;
+       private int batchSize;
+ 
+       /*
+        * Constructor
+        */
+       public AbstractStream() {
+               this(null);
+       }
+       public AbstractStream(IProcessingItem sourcePi) {
+               this.sourcePi = sourcePi;
+               this.batchSize = 1;
+       }
+       
+       /**
+        * Get source processing item of this stream
+        * @return
+        */
+       public IProcessingItem getSourceProcessingItem() {
+               return this.sourcePi;
+       }
+
+    /*
+     * Process event
+     */
+    @Override
+    /**
+     * Send a ContentEvent
+     * @param event
+     *                         the ContentEvent to be sent
+     */
+    public abstract void put(ContentEvent event);
+
+    /*
+     * Stream name
+     */
+    /**
+     * Get name (ID) of this stream
+     * @return the name (ID)
+     */
+    @Override
+    public String getStreamId() {
+       return this.streamID;
+    }
+    
+    /**
+     * Set the name (ID) of this stream
+     * @param streamID
+     *                         the name (ID)
+     */
+    public void setStreamId (String streamID) {
+       this.streamID = streamID;
+    }
+  
+    /*
+     * Batch size
+     */
+    /**
+     * Set suggested batch size
+     *
+     * @param batchSize
+     * the suggested batch size
+     *
+     */
+    @Override
+    public void setBatchSize(int batchSize) {
+       this.batchSize = batchSize;
+    }
+
+    /**
+     * Get suggested batch size
+     *
+     * @return the suggested batch size
+     */
+    public int getBatchSize() {
+       return this.batchSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
new file mode 100755
index 0000000..53385b1
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
@@ -0,0 +1,133 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Topology abstract class.
+ * 
+ * It manages basic information of a topology: name, sets of Streams and 
ProcessingItems
+ * 
+ */
+public abstract class AbstractTopology implements Topology {
+
+       private String topoName;
+    private Set<Stream> streams;
+    private Set<IProcessingItem> processingItems;
+    private Set<EntranceProcessingItem> entranceProcessingItems;
+
+    protected AbstractTopology(String name) {
+       this.topoName = name;
+       this.streams = new HashSet<>();
+        this.processingItems = new HashSet<>();
+        this.entranceProcessingItems = new HashSet<>();
+    }
+    
+    /**
+     * Gets the name of this topology
+     * 
+     * @return name of the topology
+     */
+    public String getTopologyName() {
+       return this.topoName;
+    }
+    
+    /**
+     * Sets the name of this topology
+     * 
+     * @param topologyName
+     *                         name of the topology
+     */
+    public void setTopologyName(String topologyName) {
+       this.topoName = topologyName;
+    }
+    
+    /**
+     * Adds an Entrance processing item to the topology.
+     * 
+     * @param epi
+     *                         Entrance processing item
+     */
+    public void addEntranceProcessingItem(EntranceProcessingItem epi) {
+       this.entranceProcessingItems.add(epi);
+       this.addProcessingItem(epi);
+    }
+    
+    /**
+     * Gets entrance processing items in the topology
+     * 
+     * @return the set of processing items
+     */
+    public Set<EntranceProcessingItem> getEntranceProcessingItems() {
+       return this.entranceProcessingItems;
+    }
+
+    /**
+     * Add processing item to topology.
+     * 
+     * @param procItem
+     *            Processing item.
+     */
+    public void addProcessingItem(IProcessingItem procItem) {
+        addProcessingItem(procItem, 1);
+    }
+
+    /**
+     * Add processing item to topology.
+     * 
+     * @param procItem
+     *            Processing item.
+     * @param parallelismHint
+     *            Processing item parallelism level.
+     */
+    public void addProcessingItem(IProcessingItem procItem, int 
parallelismHint) {
+        this.processingItems.add(procItem);
+    }
+    
+    /**
+     * Gets processing items in the topology (including entrance processing 
items)
+     * 
+     * @return the set of processing items
+     */
+    public Set<IProcessingItem> getProcessingItems() {
+       return this.processingItems;
+    }
+
+    /**
+     * Add stream to topology.
+     * 
+     * @param stream
+     */
+    public void addStream(Stream stream) {
+        this.streams.add(stream);
+    }
+    
+    /**
+     * Gets streams in the topology
+     * 
+     * @return the set of streams
+     */
+    public Set<Stream> getStreams() {
+       return this.streams;
+    } 
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
new file mode 100644
index 0000000..433f516
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
@@ -0,0 +1,78 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+
+/**
+ * ComponentFactory interface. Provides platform specific components.
+ */
+public interface ComponentFactory {
+
+    /**
+     * Creates a platform specific processing item with the specified 
processor.
+     * 
+     * @param processor
+     *            contains the logic for this processing item.
+     * @return ProcessingItem
+     */
+    public ProcessingItem createPi(Processor processor);
+
+    /**
+     * Creates a platform specific processing item with the specified 
processor. Additionally sets the parallelism level.
+     * 
+     * @param processor
+     *            contains the logic for this processing item.
+     * @param parallelism
+     *            defines the amount of instances of this processing item will 
be created.
+     * @return ProcessingItem
+     */
+    public ProcessingItem createPi(Processor processor, int parallelism);
+
+    /**
+     * Creates a platform specific processing item with the specified 
processor that is the entrance point in the topology. This processing item can 
either
+     * generate a stream of data or connect to an external stream of data.
+     * 
+     * @param entranceProcessor
+     *            contains the logic for this processing item.
+     * @return EntranceProcessingItem
+     */
+    public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor);
+
+    /**
+     * Creates a platform specific stream.
+     * 
+     * @param sourcePi
+     *            source processing item which will provide the events for 
this stream.
+     * @return Stream
+     */
+    public Stream createStream(IProcessingItem sourcePi);
+
+    /**
+     * Creates a platform specific topology.
+     * 
+     * @param topoName
+     *            Topology name.
+     * @return Topology
+     */
+    public Topology createTopology(String topoName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
new file mode 100644
index 0000000..32ed109
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
@@ -0,0 +1,46 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+
+/**
+ * Entrance processing item interface.
+ */
+public interface EntranceProcessingItem extends IProcessingItem {
+
+    @Override
+    /**
+     * Gets the processing item processor.
+     * 
+     * @return the embedded EntranceProcessor. 
+     */
+    public EntranceProcessor getProcessor();
+
+    /**
+     * Set the single output stream for this EntranceProcessingItem.
+     * 
+     * @param stream
+     *            the stream
+     * @return the current instance of the EntranceProcessingItem for fluent 
interface.
+     */
+    public EntranceProcessingItem setOutputStream(Stream stream);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
new file mode 100644
index 0000000..7a70dc4
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
@@ -0,0 +1,47 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.Processor;
+
+/**
+ * ProcessingItem interface specific for entrance processing items.
+ * 
+ * @author severien
+ *
+ */
+public interface IProcessingItem {
+       
+       /**
+        * Gets the processing item processor.
+        * 
+        * @return Processor
+        */
+       public Processor getProcessor();
+       
+       /**
+        * Sets processing item name.
+        * 
+        * @param name
+        */
+       //public void setName(String name);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
new file mode 100644
index 0000000..8499f80
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
@@ -0,0 +1,46 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.tasks.Task;
+
+/**
+ * Submitter interface for programatically deploying platform specific 
topologies.
+ * 
+ * @author severien
+ *
+ */
+public interface ISubmitter {
+
+       /**
+        * Deploy a specific task to a platform.
+        * 
+        * @param task
+        */
+       public void deployTask(Task task);
+       
+       /**
+        * Sets if the task should run locally or distributed.
+        * 
+        * @param bool
+        */
+       public void setLocal(boolean bool);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
new file mode 100644
index 0000000..2e8758f
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
@@ -0,0 +1,85 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+
+/**
+ * Implementation of EntranceProcessingItem for local engines (Simple, 
Multithreads)
+ * 
+ * @author Anh Thu Vu
+ *
+ */
+public class LocalEntranceProcessingItem extends 
AbstractEntranceProcessingItem {
+       public LocalEntranceProcessingItem(EntranceProcessor processor) {
+               super(processor);
+       }
+       
+       /**
+        * If there are available events, first event in the queue will be
+        * sent out on the output stream. 
+        * @return true if there is (at least) one available event and it was 
sent out
+        *         false otherwise 
+        */
+       public boolean injectNextEvent() {
+               if (this.getProcessor().hasNext()) {
+                       ContentEvent event = this.getProcessor().nextEvent();
+                       this.getOutputStream().put(event);
+                       return true;
+               }
+               return false;
+       }
+
+       /**
+        * Start sending events by calling {@link #injectNextEvent()}. If there 
are no available events, 
+        * and that the stream is not entirely consumed, it will wait by calling
+     * {@link #waitForNewEvents()} before attempting to send again.
+     * </p>
+     * When the stream is entirely consumed, the last event is tagged 
accordingly and the processor gets the
+     * finished status.
+     *
+        */
+       public void startSendingEvents () {
+               if (this.getOutputStream() == null) 
+                       throw new IllegalStateException("Try sending events 
from EntrancePI while outputStream is not set.");
+               
+               while (!this.getProcessor().isFinished()) {
+            if (!this.injectNextEvent()) {
+                try {
+                    waitForNewEvents();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    break;
+                }
+            }
+        }
+       }
+       
+       /**
+        * Method to wait for an amount of time when there are no available 
events.
+        * Implementation of EntranceProcessingItem should override this method 
to 
+        * implement non-blocking wait or to adjust the amount of time.
+        */
+       protected void waitForNewEvents() throws Exception {
+               Thread.sleep(100);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
new file mode 100644
index 0000000..02fb84d
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
@@ -0,0 +1,70 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Processing item interface.
+ * 
+ * @author severien
+ * 
+ */
+public interface ProcessingItem extends IProcessingItem {
+
+       /**
+        * Connects this processing item in a round robin fashion. The events 
will
+        * be distributed evenly between the instantiated processing items.
+        * 
+        * @param inputStream
+        *            Stream to connect this processing item.
+        * @return ProcessingItem
+        */
+       public ProcessingItem connectInputShuffleStream(Stream inputStream);
+
+       /**
+        * Connects this processing item taking the event key into account. 
Events
+        * will be routed to the processing item according to the modulus of 
its key
+        * and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 
= 1.
+        * Processing item responsible for 1 will receive this event.
+        * 
+        * @param inputStream
+        *            Stream to connect this processing item.
+        * @return ProcessingItem
+        */
+       public ProcessingItem connectInputKeyStream(Stream inputStream);
+
+       /**
+        * Connects this processing item to the stream in a broadcast fashion. 
All
+        * processing items of this type will receive copy of the original 
event.
+        * 
+        * @param inputStream
+        *            Stream to connect this processing item.
+        * @return ProcessingItem
+        */
+       public ProcessingItem connectInputAllStream(Stream inputStream);
+
+
+       /**
+        * Gets processing item parallelism level.
+        * 
+        * @return int
+        */
+       public int getParallelism();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
new file mode 100644
index 0000000..b496d35
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
@@ -0,0 +1,62 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+
+/**
+ * Stream interface.
+ *
+ * @author severien
+ *
+ */
+public interface Stream {
+       
+       /**
+        * Puts events into a platform specific data stream.
+        * 
+        * @param event
+        */
+       public void put(ContentEvent event);
+       
+       /**
+        * Sets the stream id which is represented by a name.
+        * 
+        * @param stream
+        */
+       //public void setStreamId(String stream);
+       
+       
+       /**
+        * Gets stream id.
+        * 
+        * @return id
+        */
+       public String getStreamId();
+       
+       /**
+        * Set batch size
+        *
+        * @param batchSize
+        *                  the suggested size for batching messages on this 
stream
+        */
+       public void setBatchSize(int batchsize);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
new file mode 100755
index 0000000..6ad93ed
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
@@ -0,0 +1,85 @@
+package com.yahoo.labs.samoa.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+public interface Topology {
+       /*
+        * Name
+        */
+       /**
+     * Get the topology's name
+     * 
+     * @return the name of the topology
+     */
+       public String getTopologyName();
+
+       /**
+        * Set the topology's name
+        * 
+        * @param topologyName
+        *                      the name of the topology
+        */
+       public void setTopologyName(String topologyName) ;
+
+       /*
+        * Entrance Processing Items
+        */
+       /**
+        * Add an EntranceProcessingItem to this topology
+        * 
+        * @param epi
+        *                      the EntranceProcessingItem to be added
+        */
+       void addEntranceProcessingItem(EntranceProcessingItem epi);
+       
+       
+       /*
+        * Processing Items
+        */
+       /**
+        * Add a ProcessingItem to this topology
+        * with default parallelism level (i.e. 1)
+        * 
+        * @param procItem
+        *                      the ProcessingItem to be added
+        */
+       void addProcessingItem(IProcessingItem procItem);
+       
+       /**
+        * Add a ProcessingItem to this topology 
+        * with an associated parallelism level
+        * 
+        * @param procItem
+        *                      the ProcessingItem to be added
+        * @param parallelismHint
+        *                      the parallelism level 
+        */
+       void addProcessingItem(IProcessingItem procItem, int parallelismHint);
+       
+       /*
+        * Streams
+        */
+       /**
+        * 
+        * @param stream
+        */
+       void addStream(Stream stream);
+}

Reply via email to