http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
deleted file mode 100644
index 6de0b87..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java
+++ /dev/null
@@ -1,220 +0,0 @@
-package com.yahoo.labs.samoa.tasks;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.ClassOption;
-import com.github.javacliparser.Configurable;
-import com.github.javacliparser.FileOption;
-import com.github.javacliparser.IntOption;
-import com.github.javacliparser.StringOption;
-import com.yahoo.labs.samoa.evaluation.BasicClassificationPerformanceEvaluator;
-import com.yahoo.labs.samoa.evaluation.BasicRegressionPerformanceEvaluator;
-import com.yahoo.labs.samoa.evaluation.ClassificationPerformanceEvaluator;
-import com.yahoo.labs.samoa.evaluation.PerformanceEvaluator;
-import com.yahoo.labs.samoa.evaluation.EvaluatorProcessor;
-import com.yahoo.labs.samoa.evaluation.RegressionPerformanceEvaluator;
-import com.yahoo.labs.samoa.learners.ClassificationLearner;
-import com.yahoo.labs.samoa.learners.Learner;
-import com.yahoo.labs.samoa.learners.RegressionLearner;
-import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator;
-import com.yahoo.labs.samoa.streams.PrequentialSourceProcessor;
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-import com.yahoo.labs.samoa.topology.TopologyBuilder;
-
-/**
- * Prequential Evaluation task is a scheme in evaluating performance of online 
classifiers which uses each instance for
- * testing online classifiers model and then it further uses the same instance 
for training the model(Test-then-train)
- * 
- * @author Arinto Murdopo
- * 
- */
-public class PrequentialEvaluation implements Task, Configurable {
-
-  private static final long serialVersionUID = -8246537378371580550L;
-
-  private static Logger logger = 
LoggerFactory.getLogger(PrequentialEvaluation.class);
-
-  public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Classifier to train.", Learner.class,
-      VerticalHoeffdingTree.class.getName());
-
-  public ClassOption streamTrainOption = new ClassOption("trainStream", 's', 
"Stream to learn from.",
-      InstanceStream.class,
-      RandomTreeGenerator.class.getName());
-
-  public ClassOption evaluatorOption = new ClassOption("evaluator", 'e',
-      "Classification performance evaluation method.",
-      PerformanceEvaluator.class, 
BasicClassificationPerformanceEvaluator.class.getName());
-
-  public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
-      "Maximum number of instances to test/train on  (-1 = no limit).", 
1000000, -1,
-      Integer.MAX_VALUE);
-
-  public IntOption timeLimitOption = new IntOption("timeLimit", 't',
-      "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
-      Integer.MAX_VALUE);
-
-  public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f',
-      "How many instances between samples of the learning performance.", 
100000,
-      0, Integer.MAX_VALUE);
-
-  public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
-      "Prequential_"
-          + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
-
-  public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to 
append intermediate csv results to",
-      null, "csv", true);
-
-  // Default=0: no delay/waiting
-  public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w',
-      "How many microseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
-  // Batch size to delay the incoming stream: delay of x milliseconds after 
each
-  // batch
-  public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b',
-      "The delay batch size: delay of x milliseconds after each batch ", 1, 1, 
Integer.MAX_VALUE);
-
-  private PrequentialSourceProcessor preqSource;
-
-  // private PrequentialSourceTopologyStarter preqStarter;
-
-  // private EntranceProcessingItem sourcePi;
-
-  private Stream sourcePiOutputStream;
-
-  private Learner classifier;
-
-  private EvaluatorProcessor evaluator;
-
-  // private ProcessingItem evaluatorPi;
-
-  // private Stream evaluatorPiInputStream;
-
-  private Topology prequentialTopology;
-
-  private TopologyBuilder builder;
-
-  public void getDescription(StringBuilder sb, int indent) {
-    sb.append("Prequential evaluation");
-  }
-
-  @Override
-  public void init() {
-    // TODO remove the if statement
-    // theoretically, dynamic binding will work here!
-    // test later!
-    // for now, the if statement is used by Storm
-
-    if (builder == null) {
-      builder = new TopologyBuilder();
-      logger.debug("Successfully instantiating TopologyBuilder");
-
-      builder.initTopology(evaluationNameOption.getValue());
-      logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
-    }
-
-    // instantiate PrequentialSourceProcessor and its output stream
-    // (sourcePiOutputStream)
-    preqSource = new PrequentialSourceProcessor();
-    preqSource.setStreamSource((InstanceStream) 
this.streamTrainOption.getValue());
-    preqSource.setMaxNumInstances(instanceLimitOption.getValue());
-    preqSource.setSourceDelay(sourceDelayOption.getValue());
-    preqSource.setDelayBatchSize(batchDelayOption.getValue());
-    builder.addEntranceProcessor(preqSource);
-    logger.debug("Successfully instantiating PrequentialSourceProcessor");
-
-    // preqStarter = new PrequentialSourceTopologyStarter(preqSource,
-    // instanceLimitOption.getValue());
-    // sourcePi = builder.createEntrancePi(preqSource, preqStarter);
-    // sourcePiOutputStream = builder.createStream(sourcePi);
-
-    sourcePiOutputStream = builder.createStream(preqSource);
-    // preqStarter.setInputStream(sourcePiOutputStream);
-
-    // instantiate classifier and connect it to sourcePiOutputStream
-    classifier = this.learnerOption.getValue();
-    classifier.init(builder, preqSource.getDataset(), 1);
-    builder.connectInputShuffleStream(sourcePiOutputStream, 
classifier.getInputProcessor());
-    logger.debug("Successfully instantiating Classifier");
-
-    PerformanceEvaluator evaluatorOptionValue = 
this.evaluatorOption.getValue();
-    if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, 
evaluatorOptionValue)) {
-      evaluatorOptionValue = 
getDefaultPerformanceEvaluatorForLearner(classifier);
-    }
-    evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue)
-        
.samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build();
-
-    // evaluatorPi = builder.createPi(evaluator);
-    // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream);
-    builder.addProcessor(evaluator);
-    for (Stream evaluatorPiInputStream : classifier.getResultStreams()) {
-      builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator);
-    }
-
-    logger.debug("Successfully instantiating EvaluatorProcessor");
-
-    prequentialTopology = builder.build();
-    logger.debug("Successfully building the topology");
-  }
-
-  @Override
-  public void setFactory(ComponentFactory factory) {
-    // TODO unify this code with init()
-    // for now, it's used by S4 App
-    // dynamic binding theoretically will solve this problem
-    builder = new TopologyBuilder(factory);
-    logger.debug("Successfully instantiating TopologyBuilder");
-
-    builder.initTopology(evaluationNameOption.getValue());
-    logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
-
-  }
-
-  public Topology getTopology() {
-    return prequentialTopology;
-  }
-
-  //
-  // @Override
-  // public TopologyStarter getTopologyStarter() {
-  // return this.preqStarter;
-  // }
-
-  private static boolean isLearnerAndEvaluatorCompatible(Learner learner, 
PerformanceEvaluator evaluator) {
-    return (learner instanceof RegressionLearner && evaluator instanceof 
RegressionPerformanceEvaluator) ||
-        (learner instanceof ClassificationLearner && evaluator instanceof 
ClassificationPerformanceEvaluator);
-  }
-
-  private static PerformanceEvaluator 
getDefaultPerformanceEvaluatorForLearner(Learner learner) {
-    if (learner instanceof RegressionLearner) {
-      return new BasicRegressionPerformanceEvaluator();
-    }
-    // Default to BasicClassificationPerformanceEvaluator for all other cases
-    return new BasicClassificationPerformanceEvaluator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
deleted file mode 100644
index 52e3485..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.yahoo.labs.samoa.tasks;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-import com.yahoo.labs.samoa.topology.Topology;
-
-/**
- * Task interface, the mother of all SAMOA tasks!
- */
-public interface Task {
-
-  /**
-   * Initialize this SAMOA task, i.e. create and connect ProcessingItems and 
Streams and initialize the topology
-   */
-  public void init();
-
-  /**
-   * Return the final topology object to be executed in the cluster
-   * 
-   * @return topology object to be submitted to be executed in the cluster
-   */
-  public Topology getTopology();
-
-  // /**
-  // * Return the entrance processor to start SAMOA topology
-  // * The logic to start the topology should be implemented here
-  // * @return entrance processor to start the topology
-  // */
-  // public TopologyStarter getTopologyStarter();
-
-  /**
-   * Sets the factory. TODO: propose to hide factory from task, i.e. Task will 
only see TopologyBuilder, and factory
-   * creation will be handled by TopologyBuilder
-   * 
-   * @param factory
-   *          the new factory
-   */
-  public void setFactory(ComponentFactory factory);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
deleted file mode 100644
index 6f6069b..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-
-/**
- * Helper class for EntranceProcessingItem implementation.
- * 
- * @author Anh Thu Vu
- * 
- */
-public abstract class AbstractEntranceProcessingItem implements 
EntranceProcessingItem {
-  private EntranceProcessor processor;
-  private String name;
-  private Stream outputStream;
-
-  /*
-   * Constructor
-   */
-  public AbstractEntranceProcessingItem() {
-    this(null);
-  }
-
-  public AbstractEntranceProcessingItem(EntranceProcessor processor) {
-    this.processor = processor;
-  }
-
-  /*
-   * Processor
-   */
-  /**
-   * Set the entrance processor for this EntranceProcessingItem
-   * 
-   * @param processor
-   *          the processor
-   */
-  protected void setProcessor(EntranceProcessor processor) {
-    this.processor = processor;
-  }
-
-  /**
-   * Get the EntranceProcessor of this EntranceProcessingItem.
-   * 
-   * @return the EntranceProcessor
-   */
-  public EntranceProcessor getProcessor() {
-    return this.processor;
-  }
-
-  /*
-   * Name/ID
-   */
-  /**
-   * Set the name (or ID) of this EntranceProcessingItem
-   * 
-   * @param name
-   */
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  /**
-   * Get the name (or ID) of this EntranceProcessingItem
-   * 
-   * @return the name (or ID)
-   */
-  public String getName() {
-    return this.name;
-  }
-
-  /*
-   * Output Stream
-   */
-  /**
-   * Set the output stream of this EntranceProcessingItem. An 
EntranceProcessingItem should have only 1 single output
-   * stream and should not be re-assigned.
-   * 
-   * @return this EntranceProcessingItem
-   */
-  public EntranceProcessingItem setOutputStream(Stream outputStream) {
-    if (this.outputStream != null && this.outputStream != outputStream) {
-      throw new IllegalStateException("Cannot overwrite output stream of 
EntranceProcessingItem");
-    } else
-      this.outputStream = outputStream;
-    return this;
-  }
-
-  /**
-   * Get the output stream of this EntranceProcessingItem.
-   * 
-   * @return the output stream
-   */
-  public Stream getOutputStream() {
-    return this.outputStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
deleted file mode 100644
index 58c4d7a..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-
-/**
- * Abstract ProcessingItem
- * 
- * Helper for implementation of ProcessingItem. It has basic information for a 
ProcessingItem: name, parallelismLevel
- * and a processor. Subclass of this class needs to implement {@link 
#addInputStream(Stream, PartitioningScheme)}.
- * 
- * @author Anh Thu Vu
- * 
- */
-public abstract class AbstractProcessingItem implements ProcessingItem {
-  private String name;
-  private int parallelism;
-  private Processor processor;
-
-  /*
-   * Constructor
-   */
-  public AbstractProcessingItem() {
-    this(null);
-  }
-
-  public AbstractProcessingItem(Processor processor) {
-    this(processor, 1);
-  }
-
-  public AbstractProcessingItem(Processor processor, int parallelism) {
-    this.processor = processor;
-    this.parallelism = parallelism;
-  }
-
-  /*
-   * Processor
-   */
-  /**
-   * Set the processor for this ProcessingItem
-   * 
-   * @param processor
-   *          the processor
-   */
-  protected void setProcessor(Processor processor) {
-    this.processor = processor;
-  }
-
-  /**
-   * Get the processor of this ProcessingItem
-   * 
-   * @return the processor
-   */
-  public Processor getProcessor() {
-    return this.processor;
-  }
-
-  /*
-   * Parallelism
-   */
-  /**
-   * Set the parallelism factor of this ProcessingItem
-   * 
-   * @param parallelism
-   */
-  protected void setParallelism(int parallelism) {
-    this.parallelism = parallelism;
-  }
-
-  /**
-   * Get the parallelism factor of this ProcessingItem
-   * 
-   * @return the parallelism factor
-   */
-  @Override
-  public int getParallelism() {
-    return this.parallelism;
-  }
-
-  /*
-   * Name/ID
-   */
-  /**
-   * Set the name (or ID) of this ProcessingItem
-   * 
-   * @param name
-   *          the name/ID
-   */
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  /**
-   * Get the name (or ID) of this ProcessingItem
-   * 
-   * @return the name/ID
-   */
-  public String getName() {
-    return this.name;
-  }
-
-  /*
-   * Add input streams
-   */
-  /**
-   * Add an input stream to this ProcessingItem
-   * 
-   * @param inputStream
-   *          the input stream to add
-   * @param scheme
-   *          partitioning scheme associated with this ProcessingItem and the 
input stream
-   * @return this ProcessingItem
-   */
-  protected abstract ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme);
-
-  /**
-   * Add an input stream to this ProcessingItem with SHUFFLE scheme
-   * 
-   * @param inputStream
-   *          the input stream
-   * @return this ProcessingItem
-   */
-  public ProcessingItem connectInputShuffleStream(Stream inputStream) {
-    return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE);
-  }
-
-  /**
-   * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme
-   * 
-   * @param inputStream
-   *          the input stream
-   * @return this ProcessingItem
-   */
-  public ProcessingItem connectInputKeyStream(Stream inputStream) {
-    return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY);
-  }
-
-  /**
-   * Add an input stream to this ProcessingItem with BROADCAST scheme
-   * 
-   * @param inputStream
-   *          the input stream
-   * @return this ProcessingItem
-   */
-  public ProcessingItem connectInputAllStream(Stream inputStream) {
-    return this.addInputStream(inputStream, PartitioningScheme.BROADCAST);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
deleted file mode 100644
index cab7d3a..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-
-/**
- * Abstract Stream
- * 
- * Helper for implementation of Stream. It has basic information for a Stream: 
streamID and source ProcessingItem.
- * Subclass of this class needs to implement {@link #put(ContentEvent)}.
- * 
- * @author Anh Thu Vu
- * 
- */
-
-public abstract class AbstractStream implements Stream {
-  private String streamID;
-  private IProcessingItem sourcePi;
-  private int batchSize;
-
-  /*
-   * Constructor
-   */
-  public AbstractStream() {
-    this(null);
-  }
-
-  public AbstractStream(IProcessingItem sourcePi) {
-    this.sourcePi = sourcePi;
-    this.batchSize = 1;
-  }
-
-  /**
-   * Get source processing item of this stream
-   * 
-   * @return
-   */
-  public IProcessingItem getSourceProcessingItem() {
-    return this.sourcePi;
-  }
-
-  /*
-   * Process event
-   */
-  @Override
-  /**
-   * Send a ContentEvent
-   * @param event
-   *                   the ContentEvent to be sent
-   */
-  public abstract void put(ContentEvent event);
-
-  /*
-   * Stream name
-   */
-  /**
-   * Get name (ID) of this stream
-   * 
-   * @return the name (ID)
-   */
-  @Override
-  public String getStreamId() {
-    return this.streamID;
-  }
-
-  /**
-   * Set the name (ID) of this stream
-   * 
-   * @param streamID
-   *          the name (ID)
-   */
-  public void setStreamId(String streamID) {
-    this.streamID = streamID;
-  }
-
-  /*
-   * Batch size
-   */
-  /**
-   * Set suggested batch size
-   * 
-   * @param batchSize
-   *          the suggested batch size
-   * 
-   */
-  @Override
-  public void setBatchSize(int batchSize) {
-    this.batchSize = batchSize;
-  }
-
-  /**
-   * Get suggested batch size
-   * 
-   * @return the suggested batch size
-   */
-  public int getBatchSize() {
-    return this.batchSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
deleted file mode 100644
index fd59f26..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Topology abstract class.
- * 
- * It manages basic information of a topology: name, sets of Streams and 
ProcessingItems
- * 
- */
-public abstract class AbstractTopology implements Topology {
-
-  private String topoName;
-  private Set<Stream> streams;
-  private Set<IProcessingItem> processingItems;
-  private Set<EntranceProcessingItem> entranceProcessingItems;
-
-  protected AbstractTopology(String name) {
-    this.topoName = name;
-    this.streams = new HashSet<>();
-    this.processingItems = new HashSet<>();
-    this.entranceProcessingItems = new HashSet<>();
-  }
-
-  /**
-   * Gets the name of this topology
-   * 
-   * @return name of the topology
-   */
-  public String getTopologyName() {
-    return this.topoName;
-  }
-
-  /**
-   * Sets the name of this topology
-   * 
-   * @param topologyName
-   *          name of the topology
-   */
-  public void setTopologyName(String topologyName) {
-    this.topoName = topologyName;
-  }
-
-  /**
-   * Adds an Entrance processing item to the topology.
-   * 
-   * @param epi
-   *          Entrance processing item
-   */
-  public void addEntranceProcessingItem(EntranceProcessingItem epi) {
-    this.entranceProcessingItems.add(epi);
-    this.addProcessingItem(epi);
-  }
-
-  /**
-   * Gets entrance processing items in the topology
-   * 
-   * @return the set of processing items
-   */
-  public Set<EntranceProcessingItem> getEntranceProcessingItems() {
-    return this.entranceProcessingItems;
-  }
-
-  /**
-   * Add processing item to topology.
-   * 
-   * @param procItem
-   *          Processing item.
-   */
-  public void addProcessingItem(IProcessingItem procItem) {
-    addProcessingItem(procItem, 1);
-  }
-
-  /**
-   * Add processing item to topology.
-   * 
-   * @param procItem
-   *          Processing item.
-   * @param parallelismHint
-   *          Processing item parallelism level.
-   */
-  public void addProcessingItem(IProcessingItem procItem, int parallelismHint) 
{
-    this.processingItems.add(procItem);
-  }
-
-  /**
-   * Gets processing items in the topology (including entrance processing 
items)
-   * 
-   * @return the set of processing items
-   */
-  public Set<IProcessingItem> getProcessingItems() {
-    return this.processingItems;
-  }
-
-  /**
-   * Add stream to topology.
-   * 
-   * @param stream
-   */
-  public void addStream(Stream stream) {
-    this.streams.add(stream);
-  }
-
-  /**
-   * Gets streams in the topology
-   * 
-   * @return the set of streams
-   */
-  public Set<Stream> getStreams() {
-    return this.streams;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
deleted file mode 100644
index 0482972..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-
-/**
- * ComponentFactory interface. Provides platform specific components.
- */
-public interface ComponentFactory {
-
-  /**
-   * Creates a platform specific processing item with the specified processor.
-   * 
-   * @param processor
-   *          contains the logic for this processing item.
-   * @return ProcessingItem
-   */
-  public ProcessingItem createPi(Processor processor);
-
-  /**
-   * Creates a platform specific processing item with the specified processor. 
Additionally sets the parallelism level.
-   * 
-   * @param processor
-   *          contains the logic for this processing item.
-   * @param parallelism
-   *          defines the amount of instances of this processing item will be 
created.
-   * @return ProcessingItem
-   */
-  public ProcessingItem createPi(Processor processor, int parallelism);
-
-  /**
-   * Creates a platform specific processing item with the specified processor 
that is the entrance point in the
-   * topology. This processing item can either generate a stream of data or 
connect to an external stream of data.
-   * 
-   * @param entranceProcessor
-   *          contains the logic for this processing item.
-   * @return EntranceProcessingItem
-   */
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor);
-
-  /**
-   * Creates a platform specific stream.
-   * 
-   * @param sourcePi
-   *          source processing item which will provide the events for this 
stream.
-   * @return Stream
-   */
-  public Stream createStream(IProcessingItem sourcePi);
-
-  /**
-   * Creates a platform specific topology.
-   * 
-   * @param topoName
-   *          Topology name.
-   * @return Topology
-   */
-  public Topology createTopology(String topoName);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
deleted file mode 100644
index 04cde38..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-
-/**
- * Entrance processing item interface.
- */
-public interface EntranceProcessingItem extends IProcessingItem {
-
-  @Override
-  /**
-   * Gets the processing item processor.
-   * 
-   * @return the embedded EntranceProcessor. 
-   */
-  public EntranceProcessor getProcessor();
-
-  /**
-   * Set the single output stream for this EntranceProcessingItem.
-   * 
-   * @param stream
-   *          the stream
-   * @return the current instance of the EntranceProcessingItem for fluent 
interface.
-   */
-  public EntranceProcessingItem setOutputStream(Stream stream);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
deleted file mode 100644
index b93612d..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.Processor;
-
-/**
- * ProcessingItem interface specific for entrance processing items.
- * 
- * @author severien
- * 
- */
-public interface IProcessingItem {
-
-  /**
-   * Gets the processing item processor.
-   * 
-   * @return Processor
-   */
-  public Processor getProcessor();
-
-  /**
-   * Sets processing item name.
-   * 
-   * @param name
-   */
-  // public void setName(String name);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
deleted file mode 100644
index 253ba30..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.tasks.Task;
-
-/**
- * Submitter interface for programatically deploying platform specific 
topologies.
- * 
- * @author severien
- * 
- */
-public interface ISubmitter {
-
-  /**
-   * Deploy a specific task to a platform.
-   * 
-   * @param task
-   */
-  public void deployTask(Task task);
-
-  /**
-   * Sets if the task should run locally or distributed.
-   * 
-   * @param bool
-   */
-  public void setLocal(boolean bool);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
deleted file mode 100644
index faeb0d3..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-
-/**
- * Implementation of EntranceProcessingItem for local engines (Simple, 
Multithreads)
- * 
- * @author Anh Thu Vu
- * 
- */
-public class LocalEntranceProcessingItem extends 
AbstractEntranceProcessingItem {
-  public LocalEntranceProcessingItem(EntranceProcessor processor) {
-    super(processor);
-  }
-
-  /**
-   * If there are available events, first event in the queue will be sent out 
on the output stream.
-   * 
-   * @return true if there is (at least) one available event and it was sent 
out false otherwise
-   */
-  public boolean injectNextEvent() {
-    if (this.getProcessor().hasNext()) {
-      ContentEvent event = this.getProcessor().nextEvent();
-      this.getOutputStream().put(event);
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Start sending events by calling {@link #injectNextEvent()}. If there are 
no available events, and that the stream
-   * is not entirely consumed, it will wait by calling {@link 
#waitForNewEvents()} before attempting to send again. </p>
-   * When the stream is entirely consumed, the last event is tagged 
accordingly and the processor gets the finished
-   * status.
-   * 
-   */
-  public void startSendingEvents() {
-    if (this.getOutputStream() == null)
-      throw new IllegalStateException("Try sending events from EntrancePI 
while outputStream is not set.");
-
-    while (!this.getProcessor().isFinished()) {
-      if (!this.injectNextEvent()) {
-        try {
-          waitForNewEvents();
-        } catch (Exception e) {
-          e.printStackTrace();
-          break;
-        }
-      }
-    }
-  }
-
-  /**
-   * Method to wait for an amount of time when there are no available events. 
Implementation of EntranceProcessingItem
-   * should override this method to implement non-blocking wait or to adjust 
the amount of time.
-   */
-  protected void waitForNewEvents() throws Exception {
-    Thread.sleep(100);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
deleted file mode 100644
index c1903b7..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * Processing item interface.
- * 
- * @author severien
- * 
- */
-public interface ProcessingItem extends IProcessingItem {
-
-  /**
-   * Connects this processing item in a round robin fashion. The events will 
be distributed evenly between the
-   * instantiated processing items.
-   * 
-   * @param inputStream
-   *          Stream to connect this processing item.
-   * @return ProcessingItem
-   */
-  public ProcessingItem connectInputShuffleStream(Stream inputStream);
-
-  /**
-   * Connects this processing item taking the event key into account. Events 
will be routed to the processing item
-   * according to the modulus of its key and the paralellism level. Ex.: key = 
5 and paralellism = 2, 5 mod 2 = 1.
-   * Processing item responsible for 1 will receive this event.
-   * 
-   * @param inputStream
-   *          Stream to connect this processing item.
-   * @return ProcessingItem
-   */
-  public ProcessingItem connectInputKeyStream(Stream inputStream);
-
-  /**
-   * Connects this processing item to the stream in a broadcast fashion. All 
processing items of this type will receive
-   * copy of the original event.
-   * 
-   * @param inputStream
-   *          Stream to connect this processing item.
-   * @return ProcessingItem
-   */
-  public ProcessingItem connectInputAllStream(Stream inputStream);
-
-  /**
-   * Gets processing item parallelism level.
-   * 
-   * @return int
-   */
-  public int getParallelism();
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
deleted file mode 100644
index 49d4e7f..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-
-/**
- * Stream interface.
- * 
- * @author severien
- * 
- */
-public interface Stream {
-
-  /**
-   * Puts events into a platform specific data stream.
-   * 
-   * @param event
-   */
-  public void put(ContentEvent event);
-
-  /**
-   * Sets the stream id which is represented by a name.
-   * 
-   * @param stream
-   */
-  // public void setStreamId(String stream);
-
-  /**
-   * Gets stream id.
-   * 
-   * @return id
-   */
-  public String getStreamId();
-
-  /**
-   * Set batch size
-   * 
-   * @param batchSize
-   *          the suggested size for batching messages on this stream
-   */
-  public void setBatchSize(int batchsize);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
deleted file mode 100644
index 7084377..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-public interface Topology {
-  /*
-   * Name
-   */
-  /**
-   * Get the topology's name
-   * 
-   * @return the name of the topology
-   */
-  public String getTopologyName();
-
-  /**
-   * Set the topology's name
-   * 
-   * @param topologyName
-   *          the name of the topology
-   */
-  public void setTopologyName(String topologyName);
-
-  /*
-   * Entrance Processing Items
-   */
-  /**
-   * Add an EntranceProcessingItem to this topology
-   * 
-   * @param epi
-   *          the EntranceProcessingItem to be added
-   */
-  void addEntranceProcessingItem(EntranceProcessingItem epi);
-
-  /*
-   * Processing Items
-   */
-  /**
-   * Add a ProcessingItem to this topology with default parallelism level 
(i.e. 1)
-   * 
-   * @param procItem
-   *          the ProcessingItem to be added
-   */
-  void addProcessingItem(IProcessingItem procItem);
-
-  /**
-   * Add a ProcessingItem to this topology with an associated parallelism level
-   * 
-   * @param procItem
-   *          the ProcessingItem to be added
-   * @param parallelismHint
-   *          the parallelism level
-   */
-  void addProcessingItem(IProcessingItem procItem, int parallelismHint);
-
-  /*
-   * Streams
-   */
-  /**
-   * 
-   * @param stream
-   */
-  void addStream(Stream stream);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java
deleted file mode 100644
index dd747cf..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package com.yahoo.labs.samoa.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-
-/**
- * Builder class that creates topology components and assemble them together.
- * 
- */
-public class TopologyBuilder {
-
-  // TODO:
-  // Possible options:
-  // 1. we may convert this as interface and platform dependent builder will
-  // inherit this method
-  // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology
-  // -ve -> fat class where it has capabilities to instantiate specific
-  // component and connecting them
-  // +ve -> easy abstraction for SAMOA developer
-  // "you just implement your builder logic here!"
-  private ComponentFactory componentFactory;
-  private Topology topology;
-  private Map<Processor, IProcessingItem> mapProcessorToProcessingItem;
-
-  // TODO: refactor, temporary constructor used by Storm code
-  public TopologyBuilder() {
-    // TODO: initialize _componentFactory using dynamic binding
-    // for now, use StormComponentFactory
-    // should the factory be Singleton (?)
-    // ans: at the moment, no, i.e. each builder will has its associated
-    // factory!
-    // and the factory will be instantiated using dynamic binding
-    // this.componentFactory = new StormComponentFactory();
-  }
-
-  // TODO: refactor, temporary constructor used by S4 code
-  public TopologyBuilder(ComponentFactory theFactory) {
-    this.componentFactory = theFactory;
-  }
-
-  /**
-   * Initiates topology with a specific name.
-   * 
-   * @param topologyName
-   */
-  public void initTopology(String topologyName) {
-    this.initTopology(topologyName, 0);
-  }
-
-  /**
-   * Initiates topology with a specific name and a delay between consecutive 
instances.
-   * 
-   * @param topologyName
-   * @param delay
-   *          delay between injections of two instances from source (in 
milliseconds)
-   */
-  public void initTopology(String topologyName, int delay) {
-    if (this.topology != null) {
-      // TODO: possible refactor this code later
-      System.out.println("Topology has been initialized before!");
-      return;
-    }
-    this.topology = componentFactory.createTopology(topologyName);
-  }
-
-  /**
-   * Returns the platform specific topology.
-   * 
-   * @return
-   */
-  public Topology build() {
-    return topology;
-  }
-
-  public ProcessingItem addProcessor(Processor processor, int parallelism) {
-    ProcessingItem pi = createPi(processor, parallelism);
-    if (this.mapProcessorToProcessingItem == null)
-      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
-    this.mapProcessorToProcessingItem.put(processor, pi);
-    return pi;
-  }
-
-  public ProcessingItem addProcessor(Processor processor) {
-    return addProcessor(processor, 1);
-  }
-
-  public ProcessingItem connectInputShuffleStream(Stream inputStream, 
Processor processor) {
-    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-    return pi.connectInputShuffleStream(inputStream);
-  }
-
-  public ProcessingItem connectInputKeyStream(Stream inputStream, Processor 
processor) {
-    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-    return pi.connectInputKeyStream(inputStream);
-  }
-
-  public ProcessingItem connectInputAllStream(Stream inputStream, Processor 
processor) {
-    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-    return pi.connectInputAllStream(inputStream);
-  }
-
-  public Stream createInputShuffleStream(Processor processor, Processor dest) {
-    Stream inputStream = this.createStream(dest);
-    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-    pi.connectInputShuffleStream(inputStream);
-    return inputStream;
-  }
-
-  public Stream createInputKeyStream(Processor processor, Processor dest) {
-    Stream inputStream = this.createStream(dest);
-    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-    pi.connectInputKeyStream(inputStream);
-    return inputStream;
-  }
-
-  public Stream createInputAllStream(Processor processor, Processor dest) {
-    Stream inputStream = this.createStream(dest);
-    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-    Preconditions.checkNotNull(pi, "Trying to connect to null PI");
-    pi.connectInputAllStream(inputStream);
-    return inputStream;
-  }
-
-  public Stream createStream(Processor processor) {
-    IProcessingItem pi = mapProcessorToProcessingItem.get(processor);
-    Stream ret = null;
-    Preconditions.checkNotNull(pi, "Trying to create stream from null PI");
-    ret = this.createStream(pi);
-    if (pi instanceof EntranceProcessingItem)
-      ((EntranceProcessingItem) pi).setOutputStream(ret);
-    return ret;
-  }
-
-  public EntranceProcessingItem addEntranceProcessor(EntranceProcessor 
entranceProcessor) {
-    EntranceProcessingItem pi = createEntrancePi(entranceProcessor);
-    if (this.mapProcessorToProcessingItem == null)
-      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
-    mapProcessorToProcessingItem.put(entranceProcessor, pi);
-    return pi;
-  }
-
-  public ProcessingItem getProcessingItem(Processor processor) {
-    ProcessingItem pi = (ProcessingItem) 
mapProcessorToProcessingItem.get(processor);
-    Preconditions.checkNotNull(pi, "Trying to retrieve null PI");
-    return pi;
-  }
-
-  /**
-   * Creates a processing item with a specific processor and paralellism level 
of 1.
-   * 
-   * @param processor
-   * @return ProcessingItem
-   */
-  @SuppressWarnings("unused")
-  private ProcessingItem createPi(Processor processor) {
-    return createPi(processor, 1);
-  }
-
-  /**
-   * Creates a processing item with a specific processor and paralellism level.
-   * 
-   * @param processor
-   * @param parallelism
-   * @return ProcessingItem
-   */
-  private ProcessingItem createPi(Processor processor, int parallelism) {
-    ProcessingItem pi = this.componentFactory.createPi(processor, parallelism);
-    this.topology.addProcessingItem(pi, parallelism);
-    return pi;
-  }
-
-  /**
-   * Creates a platform specific entrance processing item.
-   * 
-   * @param processor
-   * @return
-   */
-  private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) 
{
-    EntranceProcessingItem epi = 
this.componentFactory.createEntrancePi(processor);
-    this.topology.addEntranceProcessingItem(epi);
-    if (this.mapProcessorToProcessingItem == null)
-      this.mapProcessorToProcessingItem = new HashMap<Processor, 
IProcessingItem>();
-    this.mapProcessorToProcessingItem.put(processor, epi);
-    return epi;
-  }
-
-  /**
-   * Creates a platform specific stream.
-   * 
-   * @param sourcePi
-   *          source processing item.
-   * @return
-   */
-  private Stream createStream(IProcessingItem sourcePi) {
-    Stream stream = this.componentFactory.createStream(sourcePi);
-    this.topology.addStream(stream);
-    return stream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java
deleted file mode 100644
index 61dc55c..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * Represents the 3 schemes to partition the streams
- * 
- * @author Anh Thu Vu
- * 
- */
-public enum PartitioningScheme {
-  SHUFFLE, GROUP_BY_KEY, BROADCAST
-}
-// TODO: use this enum in S4
-// Storm doesn't seem to need this

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java
deleted file mode 100644
index b5e3e0e..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-
-/**
- * Represents one destination for streams. It has the info of: the 
ProcessingItem, parallelismHint, and partitioning
- * scheme. Usage: - When ProcessingItem connects to a stream, it will pass a 
StreamDestination to the stream. - Stream
- * manages a set of StreamDestination. - Used in single-threaded and 
multi-threaded local mode.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class StreamDestination {
-  private IProcessingItem pi;
-  private int parallelism;
-  private PartitioningScheme type;
-
-  /*
-   * Constructor
-   */
-  public StreamDestination(IProcessingItem pi, int parallelismHint, 
PartitioningScheme type) {
-    this.pi = pi;
-    this.parallelism = parallelismHint;
-    this.type = type;
-  }
-
-  /*
-   * Getters
-   */
-  public IProcessingItem getProcessingItem() {
-    return this.pi;
-  }
-
-  public int getParallelism() {
-    return this.parallelism;
-  }
-
-  public PartitioningScheme getPartitioningScheme() {
-    return this.type;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java
deleted file mode 100644
index 7c9212e..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.jar.Attributes;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
-
-/**
- * Utils class for building and deploying applications programmatically.
- * 
- * @author severien
- * 
- */
-public class Utils {
-
-  public static void buildSamoaPackage() {
-    try {
-      String output = "/tmp/samoa/samoa.jar";// 
System.getProperty("user.home") + "/samoa.jar";
-      Manifest manifest = createManifest();
-
-      BufferedOutputStream bo;
-
-      bo = new BufferedOutputStream(new FileOutputStream(output));
-      JarOutputStream jo = new JarOutputStream(bo, manifest);
-
-      String baseDir = System.getProperty("user.dir");
-      System.out.println(baseDir);
-
-      File samoaJar = new File(baseDir + "/target/samoa-0.0.1-SNAPSHOT.jar");
-      addEntry(jo, samoaJar, baseDir + "/target/", "/app/");
-      addLibraries(jo);
-
-      jo.close();
-      bo.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  // TODO should get the modules file from the parameters
-  public static void buildModulesPackage(List<String> modulesNames) {
-    System.out.println(System.getProperty("user.dir"));
-    try {
-      String baseDir = System.getProperty("user.dir");
-      List<File> filesArray = new ArrayList<>();
-      for (String module : modulesNames) {
-        module = "/" + module.replace(".", "/") + ".class";
-        filesArray.add(new File(baseDir + module));
-      }
-      String output = System.getProperty("user.home") + "/modules.jar";
-
-      Manifest manifest = new Manifest();
-      manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,
-          "1.0");
-      manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL,
-          "http://samoa.yahoo.com";);
-      manifest.getMainAttributes().put(
-          Attributes.Name.IMPLEMENTATION_VERSION, "0.1");
-      manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR,
-          "Yahoo");
-      manifest.getMainAttributes().put(
-          Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA");
-
-      BufferedOutputStream bo;
-
-      bo = new BufferedOutputStream(new FileOutputStream(output));
-      JarOutputStream jo = new JarOutputStream(bo, manifest);
-
-      File[] files = filesArray.toArray(new File[filesArray.size()]);
-      addEntries(jo, files, baseDir, "");
-
-      jo.close();
-      bo.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  private static void addLibraries(JarOutputStream jo) {
-    try {
-      String baseDir = System.getProperty("user.dir");
-      String libDir = baseDir + "/target/lib";
-      File inputFile = new File(libDir);
-
-      File[] files = inputFile.listFiles();
-      for (File file : files) {
-        addEntry(jo, file, baseDir, "lib");
-      }
-      jo.close();
-
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  private static void addEntries(JarOutputStream jo, File[] files, String 
baseDir, String rootDir) {
-    for (File file : files) {
-
-      if (!file.isDirectory()) {
-        addEntry(jo, file, baseDir, rootDir);
-      } else {
-        File dir = new File(file.getAbsolutePath());
-        addEntries(jo, dir.listFiles(), baseDir, rootDir);
-      }
-    }
-  }
-
-  private static void addEntry(JarOutputStream jo, File file, String baseDir, 
String rootDir) {
-    try {
-      BufferedInputStream bi = new BufferedInputStream(new 
FileInputStream(file));
-
-      String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir);
-      jo.putNextEntry(new ZipEntry(path));
-
-      byte[] buf = new byte[1024];
-      int anz;
-      while ((anz = bi.read(buf)) != -1) {
-        jo.write(buf, 0, anz);
-      }
-      bi.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  public static Manifest createManifest() {
-    Manifest manifest = new Manifest();
-    manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
-    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, 
"http://samoa.yahoo.com";);
-    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, 
"0.1");
-    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, 
"Yahoo");
-    manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, 
"SAMOA");
-    Attributes s4Attributes = new Attributes();
-    s4Attributes.putValue("S4-App-Class", "path.to.Class");
-    Attributes.Name name = new Attributes.Name("S4-App-Class");
-    Attributes.Name S4Version = new Attributes.Name("S4-Version");
-    manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp");
-    manifest.getMainAttributes().put(S4Version, "0.6.0-incubating");
-    return manifest;
-  }
-
-  public static Object getInstance(String className) {
-    Class<?> cls;
-    Object obj = null;
-    try {
-      cls = Class.forName(className);
-      obj = cls.newInstance();
-    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
-      e.printStackTrace();
-    }
-    return obj;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java 
b/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java
new file mode 100644
index 0000000..b523f48
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java
@@ -0,0 +1,44 @@
+package org.apache.samoa.core;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * The Interface ContentEvent.
+ */
+public interface ContentEvent extends java.io.Serializable {
+
+  /**
+   * Gets the content event key.
+   * 
+   * @return the key
+   */
+  public String getKey();
+
+  /**
+   * Sets the content event key.
+   * 
+   * @param key
+   *          string
+   */
+  public void setKey(String key);
+
+  public boolean isLastEvent();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java 
b/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java
new file mode 100644
index 0000000..b079cfd
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java
@@ -0,0 +1,119 @@
+package org.apache.samoa.core;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Arrays;
+
+import com.google.common.primitives.Doubles;
+
+public class DoubleVector implements java.io.Serializable {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 8243012708860261398L;
+
+  private double[] doubleArray;
+
+  public DoubleVector() {
+    this.doubleArray = new double[0];
+  }
+
+  public DoubleVector(double[] toCopy) {
+    this.doubleArray = new double[toCopy.length];
+    System.arraycopy(toCopy, 0, this.doubleArray, 0, toCopy.length);
+  }
+
+  public DoubleVector(DoubleVector toCopy) {
+    this(toCopy.getArrayRef());
+  }
+
+  public double[] getArrayRef() {
+    return this.doubleArray;
+  }
+
+  public double[] getArrayCopy() {
+    return Doubles.concat(this.doubleArray);
+  }
+
+  public int numNonZeroEntries() {
+    int count = 0;
+    for (double element : this.doubleArray) {
+      if (Double.compare(element, 0.0) != 0) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public void setValue(int index, double value) {
+    if (index >= doubleArray.length) {
+      this.doubleArray = Doubles.ensureCapacity(this.doubleArray, index + 1, 
0);
+    }
+    this.doubleArray[index] = value;
+  }
+
+  public void addToValue(int index, double value) {
+    if (index >= doubleArray.length) {
+      this.doubleArray = Doubles.ensureCapacity(this.doubleArray, index + 1, 
0);
+    }
+    this.doubleArray[index] += value;
+  }
+
+  public double sumOfValues() {
+    double sum = 0.0;
+    for (double element : this.doubleArray) {
+      sum += element;
+    }
+    return sum;
+  }
+
+  public void getSingleLineDescription(StringBuilder out) {
+    out.append("{");
+    out.append(Doubles.join("|", this.doubleArray));
+    out.append("}");
+  }
+
+  @Override
+  public String toString() {
+    return "DoubleVector [doubleArray=" + Arrays.toString(doubleArray) + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(doubleArray);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof DoubleVector))
+      return false;
+    DoubleVector other = (DoubleVector) obj;
+    return Arrays.equals(doubleArray, other.doubleArray);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java 
b/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java
new file mode 100644
index 0000000..d92e19b
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java
@@ -0,0 +1,62 @@
+package org.apache.samoa.core;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+
+import com.github.javacliparser.Configurable;
+
+/**
+ * An EntranceProcessor is a specific kind of processor dedicated to providing 
events to inject in the topology. It can
+ * be connected to a single output stream.
+ */
+public interface EntranceProcessor extends Serializable, Configurable, 
Processor {
+
+  /**
+   * Initializes the Processor. This method is called once after the topology 
is set up and before any call to the
+   * {@link nextTuple} method.
+   * 
+   * @param the
+   *          identifier of the processor.
+   */
+  public void onCreate(int id);
+
+  /**
+   * Checks whether the source stream is finished/exhausted.
+   */
+  public boolean isFinished();
+
+  /**
+   * Checks whether a new event is ready to be processed.
+   * 
+   * @return true if the EntranceProcessor is ready to provide the next event, 
false otherwise.
+   */
+  public boolean hasNext();
+
+  /**
+   * Provides the next tuple to be processed by the topology. This method is 
the entry point for external events into
+   * the topology.
+   * 
+   * @return the next event to be processed.
+   */
+  public ContentEvent nextEvent();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/Globals.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/core/Globals.java 
b/samoa-api/src/main/java/org/apache/samoa/core/Globals.java
new file mode 100644
index 0000000..6402e73
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/core/Globals.java
@@ -0,0 +1,59 @@
+package org.apache.samoa.core;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.github.javacliparser.StringUtils;
+
+/**
+ * Class for storing global information about current version of SAMOA.
+ * 
+ * @author Albert Bifet
+ * @version $Revision: 7 $
+ */
+public class Globals {
+
+  public static final String workbenchTitle = "SAMOA: Scalable Advanced 
Massive Online Analysis Platform ";
+
+  public static final String versionString = "0.0.1";
+
+  public static final String copyrightNotice = "Copyright Yahoo! Inc 2013";
+
+  public static final String webAddress = "http://github.com/yahoo/samoa";;
+
+  public static String getWorkbenchInfoString() {
+    StringBuilder result = new StringBuilder();
+    result.append(workbenchTitle);
+    StringUtils.appendNewline(result);
+    result.append("Version: ");
+    result.append(versionString);
+    StringUtils.appendNewline(result);
+    result.append("Copyright: ");
+    result.append(copyrightNotice);
+    StringUtils.appendNewline(result);
+    result.append("Web: ");
+    result.append(webAddress);
+    return result.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/Processor.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/core/Processor.java 
b/samoa-api/src/main/java/org/apache/samoa/core/Processor.java
new file mode 100644
index 0000000..abed308
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/core/Processor.java
@@ -0,0 +1,63 @@
+package org.apache.samoa.core;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+
+import com.github.javacliparser.Configurable;
+
+/**
+ * The Interface Processor.
+ */
+public interface Processor extends Serializable, Configurable {
+
+  /**
+   * Entry point for the {@link Processor} code. This method is called once 
for every event received.
+   * 
+   * @param event
+   *          the event to be processed.
+   * @return true if successful, false otherwise.
+   */
+  boolean process(ContentEvent event);
+
+  /**
+   * Initializes the Processor. This method is called once after the topology 
is set up and before any call to the
+   * {@link process} method.
+   * 
+   * @param id
+   *          the identifier of the processor.
+   */
+  void onCreate(int id);
+
+  /**
+   * Creates a copy of a processor. This method is used to instantiate 
multiple instances of the same {@link Processsor}
+   * .
+   * 
+   * @param processor
+   *          the processor to be copied.
+   * 
+   * @return a new instance of the {@link Processor}.
+   * */
+  Processor newProcessor(Processor processor); // FIXME there should be no need
+                                               // for the processor as a
+                                               // parameter
+  // TODO can we substitute this with Cloneable?
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java 
b/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java
new file mode 100644
index 0000000..92ef464
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java
@@ -0,0 +1,68 @@
+package org.apache.samoa.core;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+
+/**
+ * License
+ */
+
+//import weka.core.DenseInstance;
+//import weka.core.Instance;
+
+/**
+ * The Class SerializableInstance. This class is needed for serialization of 
kryo
+ */
+public class SerializableInstance extends DenseInstance {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -3659459626274566468L;
+
+  /**
+   * Instantiates a new serializable instance.
+   */
+  public SerializableInstance() {
+    super(0);
+  }
+
+  /**
+   * Instantiates a new serializable instance.
+   * 
+   * @param arg0
+   *          the arg0
+   */
+  public SerializableInstance(int arg0) {
+    super(arg0);
+  }
+
+  /**
+   * Instantiates a new serializable instance.
+   * 
+   * @param inst
+   *          the inst
+   */
+  public SerializableInstance(Instance inst) {
+    super(inst);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java
 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java
new file mode 100644
index 0000000..24abe3e
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java
@@ -0,0 +1,157 @@
+package org.apache.samoa.evaluation;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Utils;
+import org.apache.samoa.moa.AbstractMOAObject;
+import org.apache.samoa.moa.core.Measurement;
+
+/**
+ * Classification evaluator that performs basic incremental evaluation.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @author Albert Bifet (abifet at cs dot waikato dot ac dot nz)
+ * @version $Revision: 7 $
+ */
+public class BasicClassificationPerformanceEvaluator extends AbstractMOAObject 
implements
+    ClassificationPerformanceEvaluator {
+
+  private static final long serialVersionUID = 1L;
+
+  protected double weightObserved;
+
+  protected double weightCorrect;
+
+  protected double[] columnKappa;
+
+  protected double[] rowKappa;
+
+  protected int numClasses;
+
+  private double weightCorrectNoChangeClassifier;
+
+  private int lastSeenClass;
+
+  @Override
+  public void reset() {
+    reset(this.numClasses);
+  }
+
+  public void reset(int numClasses) {
+    this.numClasses = numClasses;
+    this.rowKappa = new double[numClasses];
+    this.columnKappa = new double[numClasses];
+    for (int i = 0; i < this.numClasses; i++) {
+      this.rowKappa[i] = 0.0;
+      this.columnKappa[i] = 0.0;
+    }
+    this.weightObserved = 0.0;
+    this.weightCorrect = 0.0;
+    this.weightCorrectNoChangeClassifier = 0.0;
+    this.lastSeenClass = 0;
+  }
+
+  @Override
+  public void addResult(Instance inst, double[] classVotes) {
+    double weight = inst.weight();
+    int trueClass = (int) inst.classValue();
+    if (weight > 0.0) {
+      if (this.weightObserved == 0) {
+        reset(inst.numClasses());
+      }
+      this.weightObserved += weight;
+      int predictedClass = Utils.maxIndex(classVotes);
+      if (predictedClass == trueClass) {
+        this.weightCorrect += weight;
+      }
+      if (rowKappa.length > 0) {
+        this.rowKappa[predictedClass] += weight;
+      }
+      if (columnKappa.length > 0) {
+        this.columnKappa[trueClass] += weight;
+      }
+    }
+    if (this.lastSeenClass == trueClass) {
+      this.weightCorrectNoChangeClassifier += weight;
+    }
+    this.lastSeenClass = trueClass;
+  }
+
+  @Override
+  public Measurement[] getPerformanceMeasurements() {
+    return new Measurement[] {
+        new Measurement("classified instances",
+            getTotalWeightObserved()),
+        new Measurement("classifications correct (percent)",
+            getFractionCorrectlyClassified() * 100.0),
+        new Measurement("Kappa Statistic (percent)",
+            getKappaStatistic() * 100.0),
+        new Measurement("Kappa Temporal Statistic (percent)",
+            getKappaTemporalStatistic() * 100.0)
+    };
+
+  }
+
+  public double getTotalWeightObserved() {
+    return this.weightObserved;
+  }
+
+  public double getFractionCorrectlyClassified() {
+    return this.weightObserved > 0.0 ? this.weightCorrect
+        / this.weightObserved : 0.0;
+  }
+
+  public double getFractionIncorrectlyClassified() {
+    return 1.0 - getFractionCorrectlyClassified();
+  }
+
+  public double getKappaStatistic() {
+    if (this.weightObserved > 0.0) {
+      double p0 = getFractionCorrectlyClassified();
+      double pc = 0.0;
+      for (int i = 0; i < this.numClasses; i++) {
+        pc += (this.rowKappa[i] / this.weightObserved)
+            * (this.columnKappa[i] / this.weightObserved);
+      }
+      return (p0 - pc) / (1.0 - pc);
+    } else {
+      return 0;
+    }
+  }
+
+  public double getKappaTemporalStatistic() {
+    if (this.weightObserved > 0.0) {
+      double p0 = this.weightCorrect / this.weightObserved;
+      double pc = this.weightCorrectNoChangeClassifier / this.weightObserved;
+
+      return (p0 - pc) / (1.0 - pc);
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    Measurement.getMeasurementsDescription(getPerformanceMeasurements(),
+        sb, indent);
+  }
+}

Reply via email to