http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java
new file mode 100644
index 0000000..5e86cb0
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java
@@ -0,0 +1,69 @@
+package com.yahoo.labs.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+
+/**
+ * Example {@link ContentEvent} that contains a single integer.
+ */
+public class HelloWorldContentEvent implements ContentEvent {
+
+    private static final long serialVersionUID = -2406968925730298156L;
+    private final boolean isLastEvent;
+    private final int helloWorldData;
+
+    public HelloWorldContentEvent(int helloWorldData, boolean isLastEvent) {
+        this.isLastEvent = isLastEvent;
+        this.helloWorldData = helloWorldData;
+    }
+    
+    /*
+     * No-argument constructor for Kryo
+     */
+    public HelloWorldContentEvent() {
+       this(0,false);
+    }
+
+    @Override
+    public String getKey() {
+        return null;
+    }
+
+    @Override
+    public void setKey(String str) {
+        // do nothing, it's key-less content event
+    }
+
+    @Override
+    public boolean isLastEvent() {
+        return isLastEvent;
+    }
+
+    public int getHelloWorldData() {
+        return helloWorldData;
+    }
+
+    @Override
+    public String toString() {
+        return "HelloWorldContentEvent [helloWorldData=" + helloWorldData + 
"]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java
new file mode 100644
index 0000000..e22c0fe
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java
@@ -0,0 +1,49 @@
+package com.yahoo.labs.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+
+/**
+ * Example {@link Processor} that simply prints the received events to 
standard output.
+ */
+public class HelloWorldDestinationProcessor implements Processor {
+
+    private static final long serialVersionUID = -6042613438148776446L;
+    private int processorId;
+
+    @Override
+    public boolean process(ContentEvent event) {
+        System.out.println(processorId + ": " + event);
+        return true;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        this.processorId = id;
+    }
+
+    @Override
+    public Processor newProcessor(Processor p) {
+        return new HelloWorldDestinationProcessor();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java
new file mode 100644
index 0000000..a37201f
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java
@@ -0,0 +1,75 @@
+package com.yahoo.labs.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Random;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+
+/**
+ * Example {@link EntranceProcessor} that generates a stream of random 
integers.
+ */
+public class HelloWorldSourceProcessor implements EntranceProcessor {
+
+    private static final long serialVersionUID = 6212296305865604747L;
+    private Random rnd;
+    private final long maxInst;
+    private long count;
+
+    public HelloWorldSourceProcessor(long maxInst) {
+        this.maxInst = maxInst;
+    }
+
+    @Override
+    public boolean process(ContentEvent event) {
+        // do nothing, API will be refined further
+        return false;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        rnd = new Random(id);
+    }
+
+    @Override
+    public Processor newProcessor(Processor p) {
+        HelloWorldSourceProcessor hwsp = (HelloWorldSourceProcessor) p;
+        return new HelloWorldSourceProcessor(hwsp.maxInst);
+    }
+
+    @Override
+    public boolean isFinished() {
+       return count >= maxInst;
+    }
+    
+    @Override
+    public boolean hasNext() {
+        return count < maxInst;
+    }
+
+    @Override
+    public ContentEvent nextEvent() {
+        count++;
+        return new HelloWorldContentEvent(rnd.nextInt(), false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java
new file mode 100644
index 0000000..e6658f1
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java
@@ -0,0 +1,98 @@
+package com.yahoo.labs.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+import com.yahoo.labs.samoa.tasks.Task;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+
+/**
+ * Example {@link Task} in SAMOA. This task simply sends events from a source 
{@link HelloWorldSourceProcessor} to a destination
+ * {@link HelloWorldDestinationProcessor}. The events are random integers 
generated by the source and encapsulated in a {@link HelloWorldContentEvent}. 
The
+ * destination prints the content of the event to standard output, prepended 
by the processor id.
+ * 
+ * The task has 2 main options: the number of events the source will generate 
(-i) and the parallelism level of the destination (-p).
+ */
+public class HelloWorldTask implements Task, Configurable {
+
+    private static final long serialVersionUID = -5134935141154021352L;
+    private static Logger logger = 
LoggerFactory.getLogger(HelloWorldTask.class);
+
+    /** The topology builder for the task. */
+    private TopologyBuilder builder;
+    /** The topology that will be created for the task */
+    private Topology helloWorldTopology;
+
+    public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
+        "Maximum number of instances to generate (-1 = no limit).", 1000000, 
-1, Integer.MAX_VALUE);
+
+    public IntOption helloWorldParallelismOption = new 
IntOption("parallelismOption", 'p',
+        "Number of destination Processors", 1, 1, Integer.MAX_VALUE);
+
+    public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n',
+        "Identifier of the evaluation", "HelloWorldTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+    @Override
+    public void init() {
+        // create source EntranceProcessor
+        /* The event source for the topology. Implements EntranceProcessor */
+        HelloWorldSourceProcessor sourceProcessor = new 
HelloWorldSourceProcessor(instanceLimitOption.getValue());
+        builder.addEntranceProcessor(sourceProcessor);
+
+        // create Stream
+        Stream stream = builder.createStream(sourceProcessor);
+
+        // create destination Processor
+        /* The event sink for the topology. Implements Processor */
+        HelloWorldDestinationProcessor destProcessor = new 
HelloWorldDestinationProcessor();
+        builder.addProcessor(destProcessor, 
helloWorldParallelismOption.getValue());
+        builder.connectInputShuffleStream(stream, destProcessor);
+
+        // build the topology
+        helloWorldTopology = builder.build();
+        logger.debug("Successfully built the topology");
+    }
+
+    @Override
+    public Topology getTopology() {
+        return helloWorldTopology;
+    }
+
+    @Override
+    public void setFactory(ComponentFactory factory) {
+        // will be removed when dynamic binding is implemented
+        builder = new TopologyBuilder(factory);
+        logger.debug("Successfully instantiating TopologyBuilder");
+        builder.initTopology(evaluationNameOption.getValue());
+        logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java
new file mode 100644
index 0000000..0986253
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java
@@ -0,0 +1,53 @@
+package com.yahoo.labs.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+
+import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import com.yahoo.labs.samoa.topology.Stream;
+
+/**
+ * The Interface Adaptive Learner.
+ * Initializing Classifier should initalize PI to connect the Classifier with 
the input stream 
+ * and initialize result stream so that other PI can connect to the 
classification result of this classifier
+ */
+
+public interface AdaptiveLearner {
+
+        /**
+        * Gets the change detector item.
+        *
+        * @return the change detector item
+        */
+       public ChangeDetector getChangeDetector();
+
+        /**
+        * Sets the change detector item.
+        *
+         * @param cd the change detector item
+        */
+       public void setChangeDetector(ChangeDetector cd);
+        
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java
new file mode 100644
index 0000000..7f1a6c9
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java
@@ -0,0 +1,27 @@
+package com.yahoo.labs.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.learners.Learner;
+
+public interface ClassificationLearner extends Learner {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java
new file mode 100644
index 0000000..91b1b7b
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java
@@ -0,0 +1,207 @@
+
+package com.yahoo.labs.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.SerializableInstance;
+import net.jcip.annotations.Immutable;
+import com.yahoo.labs.samoa.instances.Instance;
+//import weka.core.Instance;
+
+
+/**
+ * The Class InstanceEvent.
+ */
+@Immutable
+final public class InstanceContentEvent implements ContentEvent {
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -8620668863064613845L;
+       private long instanceIndex;
+       private int classifierIndex;
+       private int evaluationIndex;
+       private SerializableInstance instance;
+       private boolean isTraining;
+       private boolean isTesting;
+       private boolean isLast = false;
+       
+       public InstanceContentEvent() {
+               
+       }
+
+       /**
+        * Instantiates a new instance event.
+        *
+        * @param index the index
+        * @param instance the instance
+        * @param isTraining the is training
+        */
+       public InstanceContentEvent(long index, Instance instance, 
+                       boolean isTraining, boolean isTesting) {
+               if (instance != null) {
+                       this.instance = new SerializableInstance(instance);
+               }
+               this.instanceIndex = index;
+               this.isTraining = isTraining;
+               this.isTesting = isTesting;
+       }
+
+       /**
+        * Gets the single instance of InstanceEvent.
+        * 
+        * @return the instance.
+        */
+       public Instance getInstance() {
+               return instance;
+       }
+
+       /**
+        * Gets the instance index.
+        *
+        * @return the index of the data vector.
+        */
+       public long getInstanceIndex() {
+               return instanceIndex;
+       }
+
+       /**
+        * Gets the class id.
+        *
+        * @return the true class of the vector.
+        */
+       public int getClassId() {
+               // return classId;
+               return (int) instance.classValue();
+       }
+
+       /**
+        * Checks if is training.
+        *
+        * @return true if this is training data.
+        */
+       public boolean isTraining() {
+               return isTraining;
+       }
+       
+       /**
+        * Set training flag.
+        *
+        * @param training flag.
+        */
+       public void setTraining(boolean training) {
+               this.isTraining = training;
+       }
+       
+       /**
+        * Checks if is testing.
+        *
+        * @return true if this is testing data.
+        */
+       public boolean isTesting(){
+               return isTesting;
+       }
+       
+       /**
+        * Set testing flag.
+        *
+        * @param testing flag.
+        */
+       public void setTesting(boolean testing) {
+               this.isTesting = testing;
+       }
+
+       /**
+        * Gets the classifier index.
+        *
+        * @return the classifier index
+        */
+       public int getClassifierIndex() {
+               return classifierIndex;
+       }
+
+       /**
+        * Sets the classifier index.
+        *
+        * @param classifierIndex the new classifier index
+        */
+       public void setClassifierIndex(int classifierIndex) {
+               this.classifierIndex = classifierIndex;
+       }
+
+       /**
+        * Gets the evaluation index.
+        *
+        * @return the evaluation index
+        */
+       public int getEvaluationIndex() {
+               return evaluationIndex;
+       }
+
+       /**
+        * Sets the evaluation index.
+        *
+        * @param evaluationIndex the new evaluation index
+        */
+       public void setEvaluationIndex(int evaluationIndex) {
+               this.evaluationIndex = evaluationIndex;
+       }
+
+       /* (non-Javadoc)
+        * @see samoa.core.ContentEvent#getKey(int)
+        */
+       public String getKey(int key) {
+               if (key == 0) 
+                       return Long.toString(this.getEvaluationIndex());
+               else return Long.toString(10000
+                               * this.getEvaluationIndex()
+                               + this.getClassifierIndex());
+       }
+
+       @Override
+       public String getKey() {
+               //System.out.println("InstanceContentEvent 
"+Long.toString(this.instanceIndex));
+               return Long.toString(this.getClassifierIndex());
+       }
+
+       @Override
+       public void setKey(String str) {
+               this.instanceIndex = Long.parseLong(str);
+       }
+
+       @Override
+       public boolean isLastEvent() {
+               return isLast;
+       }
+
+       public void setLast(boolean isLast) {
+               this.isLast = isLast;
+       }
+
+       
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java
new file mode 100644
index 0000000..ff005b6
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java
@@ -0,0 +1,193 @@
+
+package com.yahoo.labs.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.SerializableInstance;
+import net.jcip.annotations.Immutable;
+import com.yahoo.labs.samoa.instances.Instance;
+import java.util.LinkedList;
+import java.util.List;
+//import weka.core.Instance;
+
+
+/**
+ * The Class InstanceEvent.
+ */
+@Immutable
+final public class InstancesContentEvent implements ContentEvent {
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -8620668863064613845L;
+       private long instanceIndex;
+       private int classifierIndex;
+       private int evaluationIndex;
+       //private SerializableInstance instance;
+       private boolean isTraining;
+       private boolean isTesting;
+       private boolean isLast = false;
+       
+       public InstancesContentEvent() {
+               
+       }
+
+       /**
+        * Instantiates a new instance event.
+        *
+        * @param index the index
+        * @param instance the instance
+        * @param isTraining the is training
+        */
+       public InstancesContentEvent(long index,// Instance instance, 
+                       boolean isTraining, boolean isTesting) {
+               /*if (instance != null) {
+                       this.instance = new SerializableInstance(instance);
+               }*/
+               this.instanceIndex = index;
+               this.isTraining = isTraining;
+               this.isTesting = isTesting;
+       }
+        
+        public InstancesContentEvent(InstanceContentEvent event){
+            this.instanceIndex = event.getInstanceIndex();
+            this.isTraining = event.isTraining();
+            this.isTesting = event.isTesting();
+        }
+
+        protected List<Instance> instanceList = new LinkedList<Instance>();
+        
+        public void add(Instance instance){
+            instanceList.add(new SerializableInstance(instance));
+        }
+        
+       /**
+        * Gets the single instance of InstanceEvent.
+        * 
+        * @return the instance.
+        */
+       public Instance[] getInstances() {
+               return instanceList.toArray(new Instance[instanceList.size()]);
+       }
+
+       /**
+        * Gets the instance index.
+        *
+        * @return the index of the data vector.
+        */
+       public long getInstanceIndex() {
+               return instanceIndex;
+       }
+
+       /**
+        * Checks if is training.
+        *
+        * @return true if this is training data.
+        */
+       public boolean isTraining() {
+               return isTraining;
+       }
+       
+       /**
+        * Checks if is testing.
+        *
+        * @return true if this is testing data.
+        */
+       public boolean isTesting(){
+               return isTesting;
+       }
+
+       /**
+        * Gets the classifier index.
+        *
+        * @return the classifier index
+        */
+       public int getClassifierIndex() {
+               return classifierIndex;
+       }
+
+       /**
+        * Sets the classifier index.
+        *
+        * @param classifierIndex the new classifier index
+        */
+       public void setClassifierIndex(int classifierIndex) {
+               this.classifierIndex = classifierIndex;
+       }
+
+       /**
+        * Gets the evaluation index.
+        *
+        * @return the evaluation index
+        */
+       public int getEvaluationIndex() {
+               return evaluationIndex;
+       }
+
+       /**
+        * Sets the evaluation index.
+        *
+        * @param evaluationIndex the new evaluation index
+        */
+       public void setEvaluationIndex(int evaluationIndex) {
+               this.evaluationIndex = evaluationIndex;
+       }
+
+       /* (non-Javadoc)
+        * @see samoa.core.ContentEvent#getKey(int)
+        */
+       public String getKey(int key) {
+               if (key == 0) 
+                       return Long.toString(this.getEvaluationIndex());
+               else return Long.toString(10000
+                               * this.getEvaluationIndex()
+                               + this.getClassifierIndex());
+       }
+
+       @Override
+       public String getKey() {
+               //System.out.println("InstanceContentEvent 
"+Long.toString(this.instanceIndex));
+               return Long.toString(this.getClassifierIndex());
+       }
+
+       @Override
+       public void setKey(String str) {
+               this.instanceIndex = Long.parseLong(str);
+       }
+
+       @Override
+       public boolean isLastEvent() {
+               return isLast;
+       }
+
+       public void setLast(boolean isLast) {
+               this.isLast = isLast;
+       }
+
+       
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java
new file mode 100644
index 0000000..993ca47
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java
@@ -0,0 +1,62 @@
+package com.yahoo.labs.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * The Interface Classifier.
+ * Initializing Classifier should initalize PI to connect the Classifier with 
the input stream 
+ * and initialize result stream so that other PI can connect to the 
classification result of this classifier
+ */
+
+public interface Learner extends Serializable{
+
+       /**
+        * Inits the Learner object.
+        *
+        * @param topologyBuilder the topology builder
+        * @param dataset the dataset
+         * @param parallelism the parallelism
+        */     
+       public void init(TopologyBuilder topologyBuilder, Instances dataset, 
int parallelism);
+       
+    /**
+        * Gets the input processing item.
+        *
+        * @return the input processing item
+        */
+       public Processor getInputProcessor();
+
+       
+       /**
+        * Gets the result streams
+        *
+        * @return the set of result streams
+        */
+       public Set<Stream> getResultStreams();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java
new file mode 100644
index 0000000..63f233e
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java
@@ -0,0 +1,27 @@
+package com.yahoo.labs.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.learners.Learner;
+
+public interface RegressionLearner extends Learner {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java
new file mode 100644
index 0000000..0879872
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java
@@ -0,0 +1,212 @@
+package com.yahoo.labs.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.SerializableInstance;
+import com.yahoo.labs.samoa.instances.Instance;
+
+/**
+ * License
+ */
+
+
+/**
+ * The Class ResultEvent.
+ */
+final public class ResultContentEvent implements ContentEvent {
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -2650420235386873306L;
+       private long instanceIndex;
+       private int classifierIndex;
+       private int evaluationIndex;
+       private SerializableInstance instance;
+       
+       private int classId; 
+       private double[] classVotes;
+       
+       private final boolean isLast;
+       
+       public ResultContentEvent(){
+               this.isLast = false;
+       }
+
+
+       public ResultContentEvent(boolean isLast) {
+               this.isLast = isLast;
+       }
+
+       /**
+        * Instantiates a new result event.
+        * 
+        * @param instanceIndex
+        *            the instance index
+        * @param instance
+        *            the instance
+        * @param classId
+        *            the class id
+        * @param classVotes
+        *            the class votes
+        */
+       public ResultContentEvent(long instanceIndex, Instance instance, int 
classId,
+                       double[] classVotes, boolean isLast) {
+               if(instance != null){
+                       this.instance = new SerializableInstance(instance);
+               }
+               this.instanceIndex = instanceIndex;
+               this.classId = classId;
+               this.classVotes = classVotes;
+               this.isLast = isLast;
+       }
+
+       /**
+        * Gets the single instance of ResultEvent.
+        * 
+        * @return single instance of ResultEvent
+        */
+       public SerializableInstance getInstance() {
+               return instance;
+       }
+
+       /**
+        * Sets the instance.
+        * 
+        * @param instance
+        *            the new instance
+        */
+       public void setInstance(SerializableInstance instance) {
+               this.instance = instance;
+       }
+
+       /**
+        * Gets the num classes.
+        * 
+        * @return the num classes
+        */
+       public int getNumClasses() { // To remove
+               return instance.numClasses();
+       }
+
+       /**
+        * Gets the instance index.
+        * 
+        * @return the index of the data vector.
+        */
+       public long getInstanceIndex() {
+               return instanceIndex;
+       }
+
+       /**
+        * Gets the class id.
+        * 
+        * @return the true class of the vector.
+        */
+       public int getClassId() { // To remove
+               return classId;// (int) instance.classValue();//classId;
+       }
+
+       /**
+        * Gets the class votes.
+        * 
+        * @return the class votes
+        */
+       public double[] getClassVotes() {
+               return classVotes;
+       }
+
+       /**
+        * Sets the class votes.
+        * 
+        * @param classVotes
+        *            the new class votes
+        */
+       public void setClassVotes(double[] classVotes) {
+               this.classVotes = classVotes;
+       }
+
+       /**
+        * Gets the classifier index.
+        * 
+        * @return the classifier index
+        */
+       public int getClassifierIndex() {
+               return classifierIndex;
+       }
+
+       /**
+        * Sets the classifier index.
+        * 
+        * @param classifierIndex
+        *            the new classifier index
+        */
+       public void setClassifierIndex(int classifierIndex) {
+               this.classifierIndex = classifierIndex;
+       }
+
+       /**
+        * Gets the evaluation index.
+        * 
+        * @return the evaluation index
+        */
+       public int getEvaluationIndex() {
+               return evaluationIndex;
+       }
+
+       /**
+        * Sets the evaluation index.
+        * 
+        * @param evaluationIndex
+        *            the new evaluation index
+        */
+       public void setEvaluationIndex(int evaluationIndex) {
+               this.evaluationIndex = evaluationIndex;
+       }
+       
+       /* (non-Javadoc)
+        * @see samoa.core.ContentEvent#getKey(int)
+        */
+       //@Override
+       public String getKey(int key) {
+               if (key == 0) 
+                       return Long.toString(this.getEvaluationIndex());
+               else return Long.toString(this.getEvaluationIndex()
+                               + 1000 * this.getInstanceIndex());
+       }
+
+       @Override
+       public String getKey() {
+               return Long.toString(this.getEvaluationIndex()%100);
+       }
+
+       @Override
+       public void setKey(String str) {
+               this.evaluationIndex = Integer.parseInt(str);
+       }
+
+       @Override
+       public boolean isLastEvent() {
+               return isLast;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java
new file mode 100644
index 0000000..b5c30db
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java
@@ -0,0 +1,78 @@
+package com.yahoo.labs.samoa.learners.classifiers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.Map;
+
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+
+/**
+ * Learner interface for non-distributed learners.
+ * 
+ * @author abifet
+ */
+public interface LocalLearner extends Serializable {
+    
+    /**
+     * Creates a new learner object.
+     *
+     * @return the learner
+     */
+    LocalLearner create();
+ 
+    /**
+     * Predicts the class memberships for a given instance. If an instance is
+     * unclassified, the returned array elements must be all zero.
+     *
+     * @param inst
+     *            the instance to be classified
+     * @return an array containing the estimated membership probabilities of 
the
+     *         test instance in each class
+     */
+    double[] getVotesForInstance(Instance inst);
+
+    /**
+     * Resets this classifier. It must be similar to starting a new classifier
+     * from scratch.
+     *
+     */
+    void resetLearning();
+
+    /**
+     * Trains this classifier incrementally using the given instance.
+     *
+     * @param inst
+     *            the instance to be used for training
+     */
+    void trainOnInstance(Instance inst);
+    
+     /**
+     * Sets where to obtain the information of attributes of Instances
+     *
+     * @param dataset
+     *            the dataset that contains the information
+     */
+    @Deprecated
+    public void setDataset(Instances dataset);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java
new file mode 100755
index 0000000..ae897f0
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java
@@ -0,0 +1,217 @@
+package com.yahoo.labs.samoa.learners.classifiers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.learners.InstanceContentEvent;
+import com.yahoo.labs.samoa.learners.ResultContentEvent;
+import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import com.yahoo.labs.samoa.topology.Stream;
+
+import static com.yahoo.labs.samoa.moa.core.Utils.maxIndex;
+
+/**
+ * The Class LearnerProcessor.
+ */
+final public class LocalLearnerProcessor implements Processor {
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -1577910988699148691L;
+
+       private static final Logger logger = 
LoggerFactory.getLogger(LocalLearnerProcessor.class);
+       
+       private LocalLearner model;
+       private Stream outputStream;
+       private int modelId;
+       private long instancesCount = 0;
+
+       /**
+        * Sets the learner.
+        *
+        * @param model the model to set
+        */
+       public void setLearner(LocalLearner model) {
+               this.model = model;
+       }
+
+       /**
+        * Gets the learner.
+        *
+        * @return the model
+        */
+       public LocalLearner getLearner() {
+               return model;
+       }
+
+       /**
+        * Set the output streams.
+        *
+        * @param outputStream the new output stream
+        */
+       public void setOutputStream(Stream outputStream) {
+               this.outputStream = outputStream;
+       }
+       
+       /**
+        * Gets the output stream.
+        *
+        * @return the output stream
+        */
+       public Stream getOutputStream() {
+               return outputStream;
+       }
+
+       /**
+        * Gets the instances count.
+        *
+        * @return number of observation vectors used in training iteration.
+        */
+       public long getInstancesCount() {
+               return instancesCount;
+       }
+
+       /**
+        * Update stats.
+        *
+        * @param event the event
+        */
+       private void updateStats(InstanceContentEvent event) {
+                Instance inst = event.getInstance();
+               this.model.trainOnInstance(inst);
+               this.instancesCount++;
+               if (this.changeDetector != null) {
+                               boolean correctlyClassifies = 
this.correctlyClassifies(inst);
+                               double oldEstimation = 
this.changeDetector.getEstimation();
+                               this.changeDetector.input(correctlyClassifies ? 
0 : 1);
+                               if (this.changeDetector.getChange() && 
this.changeDetector.getEstimation() > oldEstimation) {
+                                               //Start a new classifier
+                                               this.model.resetLearning();
+                                               
this.changeDetector.resetLearning();
+                               }
+               }
+       }
+
+     /**
+     * Gets whether this classifier correctly classifies an instance. Uses
+     * getVotesForInstance to obtain the prediction and the instance to obtain
+     * its true class.
+     *
+     *
+     * @param inst the instance to be classified
+     * @return true if the instance is correctly classified
+     */    
+     private boolean correctlyClassifies(Instance inst) {
+            return maxIndex(model.getVotesForInstance(inst)) == (int) 
inst.classValue();
+     }
+        
+       /** The test. */
+       protected int test; //to delete
+       
+       /**
+        * On event.
+        *
+        * @param event the event
+        * @return true, if successful
+        */
+    @Override
+       public boolean process(ContentEvent event) {
+
+       InstanceContentEvent inEvent = (InstanceContentEvent) event;
+               Instance instance = inEvent.getInstance();
+
+               if (inEvent.getInstanceIndex() < 0) {
+                       //end learning
+                       ResultContentEvent outContentEvent = new 
ResultContentEvent(-1, instance, 0,
+                                       new double[0], inEvent.isLastEvent());
+                       outContentEvent.setClassifierIndex(this.modelId);
+                       
outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+                       outputStream.put(outContentEvent);
+                       return false;
+               }
+               
+               if (inEvent.isTesting()){
+                       double[] dist = model.getVotesForInstance(instance);
+                       ResultContentEvent outContentEvent = new 
ResultContentEvent(inEvent.getInstanceIndex(),
+                                       instance, inEvent.getClassId(), dist, 
inEvent.isLastEvent());
+                       outContentEvent.setClassifierIndex(this.modelId);
+                       
outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+                       logger.trace(inEvent.getInstanceIndex() + " {} {}", 
modelId, dist);
+                       outputStream.put(outContentEvent);
+               }
+               
+               if (inEvent.isTraining()) {
+                       updateStats(inEvent);
+               } 
+               return false;
+       }
+
+       /* (non-Javadoc)
+        * @see samoa.core.Processor#onCreate(int)
+        */
+       @Override
+       public void onCreate(int id) {
+               this.modelId = id;
+               model = model.create();
+       }
+
+       /* (non-Javadoc)
+        * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+        */
+       @Override
+       public Processor newProcessor(Processor sourceProcessor) {
+               LocalLearnerProcessor newProcessor = new 
LocalLearnerProcessor();
+               LocalLearnerProcessor originProcessor = (LocalLearnerProcessor) 
sourceProcessor;
+
+               if (originProcessor.getLearner() != null){
+                       
newProcessor.setLearner(originProcessor.getLearner().create());
+               }
+
+               if (originProcessor.getChangeDetector() != null){
+                               
newProcessor.setChangeDetector(originProcessor.getChangeDetector());
+               }
+
+               newProcessor.setOutputStream(originProcessor.getOutputStream());
+               return newProcessor;
+       }
+        
+       protected ChangeDetector changeDetector;
+
+       public ChangeDetector getChangeDetector() {
+                       return this.changeDetector;
+       }
+
+       public void setChangeDetector(ChangeDetector cd) {
+                       this.changeDetector = cd;
+       }
+        
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
new file mode 100644
index 0000000..7e9cb4a
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
@@ -0,0 +1,269 @@
+package com.yahoo.labs.samoa.learners.classifiers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+import 
com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.GaussianNumericAttributeClassObserver;
+import com.yahoo.labs.samoa.moa.core.GaussianEstimator;
+
+/**
+ * Implementation of a non-distributed Naive Bayes classifier.
+ * 
+ * At the moment, the implementation models all attributes as numeric
+ * attributes.
+ * 
+ * @author Olivier Van Laere (vanlaere yahoo-inc dot com)
+ */
+public class NaiveBayes implements LocalLearner {
+
+       /**
+        * Default smoothing factor. For now fixed to 1E-20.
+        */
+       private static final double ADDITIVE_SMOOTHING_FACTOR = 1e-20;
+
+       /**
+        * serialVersionUID for serialization
+        */
+       private static final long serialVersionUID = 1325775209672996822L;
+
+       /**
+        * Instance of a logger for use in this class.
+        */
+       private static final Logger logger = 
LoggerFactory.getLogger(NaiveBayes.class);
+
+       /**
+        * The actual model.
+        */
+       protected Map<Integer, GaussianNumericAttributeClassObserver> 
attributeObservers;
+
+       /**
+        * Class statistics
+        */
+       protected Map<Integer, Double> classInstances;
+
+       /**
+        * Class zero-prototypes.
+        */
+       protected Map<Integer, Double> classPrototypes;
+       
+       /**
+        * Retrieve the number of classes currently known to this local model
+        * 
+        * @return the number of classes currently known to this local model
+        */
+       protected int getNumberOfClasses() {
+               return this.classInstances.size();
+       }
+
+       /**
+        * Track training instances seen.
+        */
+       protected long instancesSeen = 0L;
+
+       /**
+        * Explicit no-arg constructor.
+        */
+       public NaiveBayes() {
+               // Init the model
+               resetLearning();
+       }
+
+       /**
+        * Create an instance of this LocalLearner implementation.
+        */
+       @Override
+       public LocalLearner create() {
+               return new NaiveBayes();
+       }
+
+       /**
+        * Predicts the class memberships for a given instance. If an instance 
is
+        * unclassified, the returned array elements will be all zero.
+        * 
+        * Smoothing is being implemented by the AttributeClassObserver 
classes. At
+        * the moment, the GaussianNumericProbabilityAttributeClassObserver 
needs no
+        * smoothing as it processes continuous variables.
+        * 
+        * Please note that we transform the scores to log space to avoid 
underflow,
+        * and we replace the multiplication with addition.
+        * 
+        * The resulting scores are no longer probabilities, as a mixture of
+        * probability densities and probabilities can be used in the 
computation.
+        * 
+        * @param inst
+        *            the instance to be classified
+        * @return an array containing the estimated membership scores of the 
test
+        *         instance in each class, in log space.
+        */
+       @Override
+       public double[] getVotesForInstance(Instance inst) {
+               // Prepare the results array
+               double[] votes = new double[getNumberOfClasses()];
+               // Over all classes
+               for (int classIndex = 0; classIndex < votes.length; 
classIndex++) {
+                       // Get the prior for this class
+                       votes[classIndex] = Math.log(getPrior(classIndex));     
        
+                       // Iterate over the instance attributes
+                       for (int index = 0; index < inst.numAttributes(); 
index++) {
+                               int attributeID = inst.index(index);
+                               // Skip class attribute
+                               if (attributeID == inst.classIndex())
+                                       continue;
+                               Double value = inst.value(attributeID);
+                               // Get the observer for the given attribute
+                               GaussianNumericAttributeClassObserver obs = 
attributeObservers.get(attributeID);
+                               // Init the estimator to null by default
+                               GaussianEstimator estimator = null;
+                               if (obs != null && obs.getEstimator(classIndex) 
!= null) {
+                                       // Get the estimator
+                                       estimator = 
obs.getEstimator(classIndex);
+                               }
+                               double valueNonZero;
+                               // The null case should be handled by smoothing!
+                               if (estimator != null) {
+                                       // Get the score for a NON-ZERO 
attribute value
+                                       valueNonZero = 
estimator.probabilityDensity(value);
+                               }
+                               // We don't have an estimator
+                               else {
+                                       // Assign a very small probability that 
we do see this value
+                                       valueNonZero = 
ADDITIVE_SMOOTHING_FACTOR;
+                               }
+                               votes[classIndex] += Math.log(valueNonZero); // 
 - Math.log(valueZero);
+                       }
+                       // Check for null in the case of prequential evaluation
+                       if (this.classPrototypes.get(classIndex) != null) {
+                               // Add the prototype for the class, already in 
log space
+                               votes[classIndex] += 
Math.log(this.classPrototypes.get(classIndex));
+                       }
+               }
+               return votes;
+       }
+               
+       /**
+        * Compute the prior for the given classIndex.
+        * 
+        * Implemented by maximum likelihood at the moment.
+        * 
+        * @param classIndex
+        *            Id of the class for which we want to compute the prior.
+        * @return Prior probability for the requested class
+        */
+       private double getPrior(int classIndex) {
+               // Maximum likelihood
+               Double currentCount = this.classInstances.get(classIndex);
+               if (currentCount == null || currentCount == 0)
+                       return 0;
+               else
+                       return currentCount * 1. / this.instancesSeen;
+       }
+
+       /**
+        * Resets this classifier. It must be similar to starting a new 
classifier
+        * from scratch.
+        */
+       @Override
+       public void resetLearning() {
+               // Reset priors
+               this.instancesSeen = 0L;
+               this.classInstances = new HashMap<>();
+               this.classPrototypes = new HashMap<>();
+               // Init the attribute observers
+               this.attributeObservers = new HashMap<>();
+       }
+
+       /**
+        * Trains this classifier incrementally using the given instance.
+        * 
+        * @param inst
+        *            the instance to be used for training
+        */
+       @Override
+       public void trainOnInstance(Instance inst) {
+               // Update class statistics with weights
+               int classIndex = (int) inst.classValue();
+               Double weight = this.classInstances.get(classIndex);
+               if (weight == null)
+                       weight = 0.;
+               this.classInstances.put(classIndex, weight + inst.weight());
+               
+               // Get the class prototype
+               Double classPrototype = this.classPrototypes.get(classIndex);
+               if (classPrototype == null)
+                       classPrototype = 1.;
+               
+               // Iterate over the attributes of the given instance
+               for (int attributePosition = 0; attributePosition < inst
+                               .numAttributes(); attributePosition++) {
+                       // Get the attribute index - Dense -> 1:1, Sparse is 
remapped
+                       int attributeID = inst.index(attributePosition);
+                       // Skip class attribute
+                       if (attributeID == inst.classIndex())
+                               continue;
+                       // Get the attribute observer for the current attribute
+                       GaussianNumericAttributeClassObserver obs = 
this.attributeObservers
+                                       .get(attributeID);
+                       // Lazy init of observers, if null, instantiate a new 
one
+                       if (obs == null) {
+                               // FIXME: At this point, we model everything as 
a numeric
+                               // attribute
+                               obs = new 
GaussianNumericAttributeClassObserver();
+                               this.attributeObservers.put(attributeID, obs);
+                       }
+                       
+                       // Get the probability density function under the 
current model
+                       GaussianEstimator obs_estimator = 
obs.getEstimator(classIndex);
+                       if (obs_estimator != null) {
+                               // Fetch the probability that the feature value 
is zero
+                               double probDens_zero_current = 
obs_estimator.probabilityDensity(0);
+                               classPrototype -= probDens_zero_current;
+                       }
+                       
+                       // FIXME: Sanity check on data values, for now just 
learn
+                       // Learn attribute value for given class
+                       
obs.observeAttributeClass(inst.valueSparse(attributePosition),
+                                       (int) inst.classValue(), inst.weight());
+                       
+                       // Update obs_estimator to fetch the pdf from the 
updated model
+                       obs_estimator = obs.getEstimator(classIndex);
+                       // Fetch the probability that the feature value is zero
+                       double probDens_zero_updated = 
obs_estimator.probabilityDensity(0);
+                       // Update the class prototype
+                       classPrototype += probDens_zero_updated;
+               }
+               // Store the class prototype
+               this.classPrototypes.put(classIndex, classPrototype);
+               // Count another training instance
+               this.instancesSeen++;
+       }
+
+       @Override
+       public void setDataset(Instances dataset) {
+               // Do nothing
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
new file mode 100644
index 0000000..a3fb89f
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
@@ -0,0 +1,150 @@
+package com.yahoo.labs.samoa.learners.classifiers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+/**
+ * License
+ */
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.instances.InstancesHeader;
+import com.yahoo.labs.samoa.moa.classifiers.functions.MajorityClass;
+
+/**
+ *
+ * Base class for adapting external classifiers.
+ *
+ */
+public class SimpleClassifierAdapter implements LocalLearner, Configurable {
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = 4372366401338704353L;
+    
+    public ClassOption learnerOption = new ClassOption("learner", 'l',
+            "Classifier to train.", 
com.yahoo.labs.samoa.moa.classifiers.Classifier.class, 
MajorityClass.class.getName());
+    /**
+     * The learner.
+     */
+    protected com.yahoo.labs.samoa.moa.classifiers.Classifier learner;
+    
+    /**
+     * The is init.
+     */
+    protected Boolean isInit;
+    
+    /**
+     * The dataset.
+     */
+    protected Instances dataset;
+
+    @Override
+    public void setDataset(Instances dataset) {
+        this.dataset = dataset;
+    }
+
+    /**
+     * Instantiates a new learner.
+     *
+     * @param learner the learner
+     * @param dataset the dataset
+     */
+    public 
SimpleClassifierAdapter(com.yahoo.labs.samoa.moa.classifiers.Classifier 
learner, Instances dataset) {
+        this.learner = learner.copy();
+        this.isInit = false;
+        this.dataset = dataset;
+    }
+
+    /**
+     * Instantiates a new learner.
+     *
+     */
+    public SimpleClassifierAdapter() {
+        this.learner = ((com.yahoo.labs.samoa.moa.classifiers.Classifier) 
this.learnerOption.getValue()).copy();
+        this.isInit = false;
+    }
+
+    /**
+     * Creates a new learner object.
+     *
+     * @return the learner
+     */
+    @Override
+    public SimpleClassifierAdapter create() {
+        SimpleClassifierAdapter l = new SimpleClassifierAdapter(learner, 
dataset);
+        if (dataset == null) {
+            System.out.println("dataset null while creating");
+        }
+        return l;
+    }
+
+    /**
+     * Trains this classifier incrementally using the given instance.
+     *
+     * @param inst the instance to be used for training
+     */
+    @Override
+    public void trainOnInstance(Instance inst) {
+        if (!this.isInit) {
+            this.isInit = true;
+            InstancesHeader instances = new InstancesHeader(dataset);
+            this.learner.setModelContext(instances);
+            this.learner.prepareForUse();
+        }
+        if (inst.weight() > 0) {
+            inst.setDataset(dataset);
+            learner.trainOnInstance(inst);
+        }
+    }
+
+    /**
+     * Predicts the class memberships for a given instance. If an instance is
+     * unclassified, the returned array elements must be all zero.
+     *
+     * @param inst the instance to be classified
+     * @return an array containing the estimated membership probabilities of 
the
+     * test instance in each class
+     */
+    @Override
+    public double[] getVotesForInstance(Instance inst) {
+        double[] ret;
+        inst.setDataset(dataset);
+        if (!this.isInit) {
+           ret = new double[dataset.numClasses()];
+        } else {
+            ret = learner.getVotesForInstance(inst);
+        }
+        return ret;
+    }
+
+    /**
+     * Resets this classifier. It must be similar to starting a new classifier
+     * from scratch.
+     *
+     */
+    @Override
+    public void resetLearning() {
+        learner.resetLearning();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
new file mode 100644
index 0000000..affc935
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
@@ -0,0 +1,109 @@
+package com.yahoo.labs.samoa.learners.classifiers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.learners.AdaptiveLearner;
+import com.yahoo.labs.samoa.learners.Learner;
+import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+/**
+ * 
+ * Classifier that contain a single classifier.
+ * 
+ */
+public final class SingleClassifier implements Learner, AdaptiveLearner, 
Configurable {
+
+       private static final long serialVersionUID = 684111382631697031L;
+       
+       private LocalLearnerProcessor learnerP;
+               
+       private Stream resultStream;
+
+       private Instances dataset;
+
+       public ClassOption learnerOption = new ClassOption("learner", 'l',
+                       "Classifier to train.", LocalLearner.class, 
SimpleClassifierAdapter.class.getName());
+       
+       private TopologyBuilder builder;
+
+       private int parallelism;
+
+
+       @Override
+       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism){
+               this.builder = builder;
+               this.dataset = dataset;
+               this.parallelism = parallelism;
+               this.setLayout();
+       }
+
+
+       protected void setLayout() {            
+               learnerP = new LocalLearnerProcessor();
+               learnerP.setChangeDetector(this.getChangeDetector());
+               LocalLearner learner = this.learnerOption.getValue();
+               learner.setDataset(this.dataset);
+               learnerP.setLearner(learner);
+                
+               //learnerPI = this.builder.createPi(learnerP, 1);
+               this.builder.addProcessor(learnerP, parallelism);
+               resultStream = this.builder.createStream(learnerP);
+
+               learnerP.setOutputStream(resultStream);
+       }
+
+       @Override
+       public Processor getInputProcessor() {
+               return learnerP;
+       }
+
+       /* (non-Javadoc)
+        * @see samoa.learners.Learner#getResultStreams()
+        */
+       @Override
+       public Set<Stream> getResultStreams() {
+               return ImmutableSet.of(this.resultStream);
+       }
+
+       protected ChangeDetector changeDetector;    
+
+       @Override
+       public ChangeDetector getChangeDetector() {
+               return this.changeDetector;
+       }
+
+       @Override
+       public void setChangeDetector(ChangeDetector cd) {
+               this.changeDetector = cd;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
new file mode 100644
index 0000000..aba3d1d
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
@@ -0,0 +1,149 @@
+package com.yahoo.labs.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.learners.AdaptiveLearner;
+import com.yahoo.labs.samoa.learners.Learner;
+import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
+import 
com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ADWINChangeDetector;
+import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+
+/**
+ * The Bagging Classifier by Oza and Russell.
+ */
+public class AdaptiveBagging implements Learner, Configurable {
+    
+       /** Logger */
+  private static final Logger logger = 
LoggerFactory.getLogger(AdaptiveBagging.class);
+
+       /** The Constant serialVersionUID. */
+       private static final long serialVersionUID = -2971850264864952099L;
+       
+       /** The base learner option. */
+       public ClassOption baseLearnerOption = new ClassOption("baseLearner", 
'l',
+                       "Classifier to train.", Learner.class, 
VerticalHoeffdingTree.class.getName());
+
+       /** The ensemble size option. */
+       public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+                       "The number of models in the bag.", 10, 1, 
Integer.MAX_VALUE);
+
+       public ClassOption driftDetectionMethodOption = new 
ClassOption("driftDetectionMethod", 'd',
+      "Drift detection method to use.", ChangeDetector.class, 
ADWINChangeDetector.class.getName());
+
+       /** The distributor processor. */
+       private BaggingDistributorProcessor distributorP;
+
+       /** The result stream. */
+       protected Stream resultStream;
+       
+       /** The dataset. */
+       private Instances dataset;
+        
+       protected Learner classifier;
+        
+  protected int parallelism;
+
+       /**
+        * Sets the layout.
+        */
+       protected void setLayout() {
+
+               int sizeEnsemble = this.ensembleSizeOption.getValue();
+
+               distributorP = new BaggingDistributorProcessor();
+               distributorP.setSizeEnsemble(sizeEnsemble);
+                this.builder.addProcessor(distributorP, 1);
+                       
+               //instantiate classifier
+               classifier = this.baseLearnerOption.getValue();
+               if (classifier instanceof AdaptiveLearner) {
+                               // logger.info("Building an AdaptiveLearner 
{}", classifier.getClass().getName());
+                               AdaptiveLearner ada = (AdaptiveLearner) 
classifier;
+                               ada.setChangeDetector((ChangeDetector) 
this.driftDetectionMethodOption.getValue());
+               }
+               classifier.init(builder, this.dataset, sizeEnsemble);
+        
+               PredictionCombinerProcessor predictionCombinerP= new 
PredictionCombinerProcessor();
+               predictionCombinerP.setSizeEnsemble(sizeEnsemble);
+               this.builder.addProcessor(predictionCombinerP, 1);
+               
+               //Streams
+               resultStream = this.builder.createStream(predictionCombinerP);
+               predictionCombinerP.setOutputStream(resultStream);
+
+               for (Stream subResultStream:classifier.getResultStreams()) {
+                       this.builder.connectInputKeyStream(subResultStream, 
predictionCombinerP);
+               }
+               
+               /* The training stream. */
+               Stream testingStream = this.builder.createStream(distributorP);
+                this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
+       
+               /* The prediction stream. */
+               Stream predictionStream = 
this.builder.createStream(distributorP);
+                this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
+               
+               distributorP.setOutputStream(testingStream);
+               distributorP.setPredictionStream(predictionStream);
+       }
+
+       /** The builder. */
+       private TopologyBuilder builder;
+               
+       
+       @Override
+       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+               this.builder = builder;
+               this.dataset = dataset;
+                this.parallelism = parallelism;
+               this.setLayout();
+       }
+
+        @Override
+       public Processor getInputProcessor() {
+               return distributorP;
+       }
+        
+       /* (non-Javadoc)
+        * @see samoa.learners.Learner#getResultStreams()
+        */
+       @Override
+       public Set<Stream> getResultStreams() {
+               return ImmutableSet.of(this.resultStream);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
new file mode 100644
index 0000000..9f99ff1
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
@@ -0,0 +1,138 @@
+package com.yahoo.labs.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.learners.Learner;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
+
+/**
+ * The Bagging Classifier by Oza and Russell.
+ */
+public class Bagging implements Learner , Configurable {
+
+       /** The Constant serialVersionUID. */
+       private static final long serialVersionUID = -2971850264864952099L;
+       
+       /** The base learner option. */
+       public ClassOption baseLearnerOption = new ClassOption("baseLearner", 
'l',
+                       "Classifier to train.", Learner.class, 
VerticalHoeffdingTree.class.getName());
+
+        
+       /** The ensemble size option. */
+       public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+                       "The number of models in the bag.", 10, 1, 
Integer.MAX_VALUE);
+
+       /** The distributor processor. */
+       private BaggingDistributorProcessor distributorP;
+       
+       /** The training stream. */
+       private Stream testingStream;
+       
+       /** The prediction stream. */
+       private Stream predictionStream;
+       
+       /** The result stream. */
+       protected Stream resultStream;
+       
+       /** The dataset. */
+       private Instances dataset;
+        
+        protected Learner classifier;
+        
+        protected int parallelism;
+
+       /**
+        * Sets the layout.
+        */
+       protected void setLayout() {
+
+               int sizeEnsemble = this.ensembleSizeOption.getValue();
+
+               distributorP = new BaggingDistributorProcessor();
+               distributorP.setSizeEnsemble(sizeEnsemble);
+                this.builder.addProcessor(distributorP, 1);
+                       
+                //instantiate classifier 
+                classifier = (Learner) this.baseLearnerOption.getValue();
+                classifier.init(builder, this.dataset, sizeEnsemble);
+        
+               PredictionCombinerProcessor predictionCombinerP= new 
PredictionCombinerProcessor();
+               predictionCombinerP.setSizeEnsemble(sizeEnsemble);
+               this.builder.addProcessor(predictionCombinerP, 1);
+               
+               //Streams
+               resultStream = this.builder.createStream(predictionCombinerP);
+               predictionCombinerP.setOutputStream(resultStream);
+
+               for (Stream subResultStream:classifier.getResultStreams()) {
+                       this.builder.connectInputKeyStream(subResultStream, 
predictionCombinerP);
+               }
+               
+               testingStream = this.builder.createStream(distributorP);
+                this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
+       
+               predictionStream = this.builder.createStream(distributorP);     
        
+                this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
+               
+               distributorP.setOutputStream(testingStream);
+               distributorP.setPredictionStream(predictionStream);
+       }
+
+       /** The builder. */
+       private TopologyBuilder builder;
+               
+       
+       @Override
+       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+               this.builder = builder;
+               this.dataset = dataset;
+                this.parallelism = parallelism;
+               this.setLayout();
+       }
+
+        @Override
+       public Processor getInputProcessor() {
+               return distributorP;
+       }
+        
+    /* (non-Javadoc)
+     * @see samoa.learners.Learner#getResultStreams()
+     */
+    @Override
+    public Set<Stream> getResultStreams() {
+       Set<Stream> streams = ImmutableSet.of(this.resultStream);
+               return streams;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
new file mode 100644
index 0000000..65c782b
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
@@ -0,0 +1,201 @@
+package com.yahoo.labs.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.learners.InstanceContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.moa.core.MiscUtils;
+import com.yahoo.labs.samoa.topology.Stream;
+import java.util.Random;
+
+/**
+ * The Class BaggingDistributorPE.
+ */
+public class BaggingDistributorProcessor implements Processor{
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -1550901409625192730L;
+
+       /** The size ensemble. */
+       private int sizeEnsemble;
+       
+       /** The training stream. */
+       private Stream trainingStream;
+       
+       /** The prediction stream. */
+       private Stream predictionStream;
+
+       /**
+        * On event.
+        *
+        * @param event the event
+        * @return true, if successful
+        */
+       public boolean process(ContentEvent event) {
+               InstanceContentEvent inEvent = (InstanceContentEvent) event; 
//((s4Event) event).getContentEvent();
+               //InstanceEvent inEvent = (InstanceEvent) event;
+
+               if (inEvent.getInstanceIndex() < 0) {
+                       // End learning
+                       predictionStream.put(event);
+                       return false;
+               }
+
+
+                if (inEvent.isTesting()){ 
+                       Instance trainInst = inEvent.getInstance();
+                       for (int i = 0; i < sizeEnsemble; i++) {
+                               Instance weightedInst = trainInst.copy();
+                               //weightedInst.setWeight(trainInst.weight() * 
k);
+                               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
+                                               inEvent.getInstanceIndex(), 
weightedInst, false, true);
+                               instanceContentEvent.setClassifierIndex(i);
+                               
instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());  
+                               predictionStream.put(instanceContentEvent);
+                       }
+               }
+                
+                               /* Estimate model parameters using the training 
data. */
+               if (inEvent.isTraining()) {
+                       train(inEvent);
+               } 
+               return false;
+       }
+
+       /** The random. */
+       protected Random random = new Random();
+
+       /**
+        * Train.
+        *
+        * @param inEvent the in event
+        */
+       protected void train(InstanceContentEvent inEvent) {
+               Instance trainInst = inEvent.getInstance();
+               for (int i = 0; i < sizeEnsemble; i++) {
+                       int k = MiscUtils.poisson(1.0, this.random);
+                       if (k > 0) {
+                               Instance weightedInst = trainInst.copy();
+                               weightedInst.setWeight(trainInst.weight() * k);
+                               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
+                                               inEvent.getInstanceIndex(), 
weightedInst, true, false);
+                               instanceContentEvent.setClassifierIndex(i);
+                               
instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());  
+                               trainingStream.put(instanceContentEvent);
+                       }
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.s4.core.ProcessingElement#onCreate()
+        */
+       @Override
+       public void onCreate(int id) {
+               //do nothing
+       }
+
+
+       /**
+        * Gets the training stream.
+        *
+        * @return the training stream
+        */
+       public Stream getTrainingStream() {
+               return trainingStream;
+       }
+
+       /**
+        * Sets the training stream.
+        *
+        * @param trainingStream the new training stream
+        */
+       public void setOutputStream(Stream trainingStream) {
+               this.trainingStream = trainingStream;
+       }
+
+       /**
+        * Gets the prediction stream.
+        *
+        * @return the prediction stream
+        */
+       public Stream getPredictionStream() {
+               return predictionStream;
+       }
+
+       /**
+        * Sets the prediction stream.
+        *
+        * @param predictionStream the new prediction stream
+        */
+       public void setPredictionStream(Stream predictionStream) {
+               this.predictionStream = predictionStream;
+       }
+
+       /**
+        * Gets the size ensemble.
+        *
+        * @return the size ensemble
+        */
+       public int getSizeEnsemble() {
+               return sizeEnsemble;
+       }
+
+       /**
+        * Sets the size ensemble.
+        *
+        * @param sizeEnsemble the new size ensemble
+        */
+       public void setSizeEnsemble(int sizeEnsemble) {
+               this.sizeEnsemble = sizeEnsemble;
+       }
+       
+       
+       /* (non-Javadoc)
+        * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+        */
+       @Override
+       public Processor newProcessor(Processor sourceProcessor) {
+               BaggingDistributorProcessor newProcessor = new 
BaggingDistributorProcessor();
+               BaggingDistributorProcessor originProcessor = 
(BaggingDistributorProcessor) sourceProcessor;
+               if (originProcessor.getPredictionStream() != null){
+                       
newProcessor.setPredictionStream(originProcessor.getPredictionStream());
+               }
+               if (originProcessor.getTrainingStream() != null){
+                       
newProcessor.setOutputStream(originProcessor.getTrainingStream());
+               }
+               newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble());
+               /*if (originProcessor.getLearningCurve() != null){
+                       newProcessor.setLearningCurve((LearningCurve) 
originProcessor.getLearningCurve().copy());
+               }*/
+               return newProcessor;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
new file mode 100644
index 0000000..06723e2
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
@@ -0,0 +1,142 @@
+package com.yahoo.labs.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.learners.Learner;
+import com.yahoo.labs.samoa.learners.classifiers.SingleClassifier;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.TopologyBuilder;
+
+/**
+ * The Bagging Classifier by Oza and Russell.
+ */
+public class Boosting implements Learner , Configurable {
+
+       /** The Constant serialVersionUID. */
+       private static final long serialVersionUID = -2971850264864952099L;
+       
+       /** The base learner option. */
+       public ClassOption baseLearnerOption = new ClassOption("baseLearner", 
'l',
+                       "Classifier to train.", Learner.class, 
SingleClassifier.class.getName());
+
+       /** The ensemble size option. */
+       public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+                       "The number of models in the bag.", 10, 1, 
Integer.MAX_VALUE);
+
+       /** The distributor processor. */
+       private BoostingDistributorProcessor distributorP;
+
+       /** The result stream. */
+       protected Stream resultStream;
+       
+       /** The dataset. */
+       private Instances dataset;
+        
+       protected Learner classifier;
+        
+       protected int parallelism;
+
+       /**
+        * Sets the layout.
+        */
+       protected void setLayout() {
+
+               int sizeEnsemble = this.ensembleSizeOption.getValue();
+
+               distributorP = new BoostingDistributorProcessor();
+               distributorP.setSizeEnsemble(sizeEnsemble);
+               this.builder.addProcessor(distributorP, 1);
+       
+               //instantiate classifier
+               classifier = this.baseLearnerOption.getValue();
+               classifier.init(builder, this.dataset, sizeEnsemble);
+               
+               BoostingPredictionCombinerProcessor predictionCombinerP= new 
BoostingPredictionCombinerProcessor();
+               predictionCombinerP.setSizeEnsemble(sizeEnsemble);
+               this.builder.addProcessor(predictionCombinerP, 1);
+               
+               //Streams
+               resultStream = this.builder.createStream(predictionCombinerP);
+               predictionCombinerP.setOutputStream(resultStream);
+
+               for (Stream subResultStream:classifier.getResultStreams()) {
+                       this.builder.connectInputKeyStream(subResultStream, 
predictionCombinerP);
+               }
+               
+               /* The testing stream. */
+               Stream testingStream = this.builder.createStream(distributorP);
+               this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
+       
+               /* The prediction stream. */
+               Stream predictionStream = 
this.builder.createStream(distributorP);
+               this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
+               
+               distributorP.setOutputStream(testingStream);
+               distributorP.setPredictionStream(predictionStream);
+                
+    // Addition to Bagging: stream to train
+    /* The training stream. */
+               Stream trainingStream = 
this.builder.createStream(predictionCombinerP);
+               predictionCombinerP.setTrainingStream(trainingStream);
+               this.builder.connectInputKeyStream(trainingStream, 
classifier.getInputProcessor());
+                
+       }
+
+       /** The builder. */
+       private TopologyBuilder builder;
+
+       /* (non-Javadoc)
+        * @see samoa.classifiers.Classifier#init(samoa.engines.Engine, 
samoa.core.Stream, weka.core.Instances)
+        */                     
+       
+       @Override
+       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+               this.builder = builder;
+               this.dataset = dataset;
+                this.parallelism = parallelism;
+               this.setLayout();
+       }
+
+        @Override
+       public Processor getInputProcessor() {
+               return distributorP;
+       }
+        
+    /* (non-Javadoc)
+     * @see samoa.learners.Learner#getResultStreams()
+     */
+    @Override
+    public Set<Stream> getResultStreams() {
+                       return ImmutableSet.of(this.resultStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
new file mode 100644
index 0000000..7100e7e
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
@@ -0,0 +1,36 @@
+package com.yahoo.labs.samoa.learners.classifiers.ensemble;
+
+import com.yahoo.labs.samoa.learners.InstanceContentEvent;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+/**
+ * The Class BoostingDistributorProcessor.
+ */
+public class BoostingDistributorProcessor extends BaggingDistributorProcessor{
+    
+        @Override
+       protected void train(InstanceContentEvent inEvent) {
+            // Boosting is trained from the prediction combiner, not from the 
input
+        }
+    
+}

Reply via email to