http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/AMRulesRegressor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/AMRulesRegressor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/AMRulesRegressor.java
index 268072b..a05772b 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/AMRulesRegressor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/AMRulesRegressor.java
@@ -39,138 +39,139 @@ import com.yahoo.labs.samoa.topology.Stream;
 import com.yahoo.labs.samoa.topology.TopologyBuilder;
 
 /**
- * AMRules Regressor
- * is the task for the serialized implementation of AMRules algorithm for 
regression rule. 
- * It is adapted to SAMOA from the implementation of AMRules in MOA.
+ * AMRules Regressor is the task for the serialized implementation of AMRules
+ * algorithm for regression rule. It is adapted to SAMOA from the 
implementation
+ * of AMRules in MOA.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 
 public class AMRulesRegressor implements RegressionLearner, Configurable {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 1L;
-
-       // Options
-       public FloatOption splitConfidenceOption = new FloatOption(
-                       "splitConfidence",
-                       'c',
-                       "Hoeffding Bound Parameter. The allowable error in 
split decision, values closer to 0 will take longer to decide.",
-                       0.0000001, 0.0, 1.0);
-
-       public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
-                       't', "Hoeffding Bound Parameter. Threshold below which 
a split will be forced to break ties.",
-                       0.05, 0.0, 1.0);
-
-       public IntOption gracePeriodOption = new IntOption("gracePeriod",
-                       'g', "Hoeffding Bound Parameter. The number of 
instances a leaf should observe between split attempts.",
-                       200, 1, Integer.MAX_VALUE);
-
-       public FlagOption DriftDetectionOption = new 
FlagOption("DoNotDetectChanges", 'H',
-                       "Drift Detection. Page-Hinkley.");
-
-       public FloatOption pageHinckleyAlphaOption = new FloatOption(
-                       "pageHinckleyAlpha",
-                       'a',
-                       "The alpha value to use in the Page Hinckley change 
detection tests.",
-                       0.005, 0.0, 1.0);
-
-       public IntOption pageHinckleyThresholdOption = new IntOption(
-                       "pageHinckleyThreshold",
-                       'l',
-                       "The threshold value (Lambda) to be used in the Page 
Hinckley change detection tests.",
-                       35, 0, Integer.MAX_VALUE);
-
-       public FlagOption noAnomalyDetectionOption = new 
FlagOption("noAnomalyDetection", 'A',
-                       "Disable anomaly Detection.");
-
-       public FloatOption multivariateAnomalyProbabilityThresholdOption = new 
FloatOption(
-                       "multivariateAnomalyProbabilityThresholdd",
-                       'm',
-                       "Multivariate anomaly threshold value.",
-                       0.99, 0.0, 1.0);
-
-       public FloatOption univariateAnomalyProbabilityThresholdOption = new 
FloatOption(
-                       "univariateAnomalyprobabilityThreshold",
-                       'u',
-                       "Univariate anomaly threshold value.",
-                       0.10, 0.0, 1.0);
-
-       public IntOption anomalyNumInstThresholdOption = new IntOption(
-                       "anomalyThreshold",
-                       'n',
-                       "The threshold value of anomalies to be used in the 
anomaly detection.",
-                       30, 0, Integer.MAX_VALUE); // num minimum of instances 
to detect anomalies. 15.
-
-       public FlagOption unorderedRulesOption = new 
FlagOption("setUnorderedRulesOn", 'U',
-            "unorderedRules.");
-       
-       public ClassOption numericObserverOption = new 
ClassOption("numericObserver",
-            'z', "Numeric observer.", 
-            FIMTDDNumericAttributeClassLimitObserver.class,
-            "FIMTDDNumericAttributeClassLimitObserver");
-
-       public MultiChoiceOption predictionFunctionOption = new 
MultiChoiceOption(
-                       "predictionFunctionOption", 'P', "The prediction 
function to use.", new String[]{
-                                       "Adaptative","Perceptron", "Target 
Mean"}, new String[]{
-                                       "Adaptative","Perceptron", "Target 
Mean"}, 0);
-
-       public FlagOption constantLearningRatioDecayOption = new FlagOption(
-                       "learningRatio_Decay_set_constant", 'd',
-                       "Learning Ratio Decay in Perceptron set to be constant. 
(The next parameter).");
-
-       public FloatOption learningRatioOption = new FloatOption(
-                       "learningRatio", 's', 
-                       "Constante Learning Ratio to use for training the 
Perceptrons in the leaves.", 0.025);
-       
-       public ClassOption votingTypeOption = new ClassOption("votingType",
-            'V', "Voting Type.", 
-            ErrorWeightedVote.class,
-            "InverseErrorWeightedVote");
-       
-       // Processor
-       private AMRulesRegressorProcessor processor;
-       
-       // Stream
-       private Stream resultStream;
-       
-       @Override
-       public void init(TopologyBuilder topologyBuilder, Instances dataset, 
int parallelism) {
-        this.processor = new AMRulesRegressorProcessor.Builder(dataset)
-               .threshold(pageHinckleyThresholdOption.getValue())
-               .alpha(pageHinckleyAlphaOption.getValue())
-               .changeDetection(this.DriftDetectionOption.isSet())
-               .predictionFunction(predictionFunctionOption.getChosenIndex())
-               
.constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
-               .learningRatio(learningRatioOption.getValue())
-               .splitConfidence(splitConfidenceOption.getValue())
-               .tieThreshold(tieThresholdOption.getValue())
-               .gracePeriod(gracePeriodOption.getValue())
-               .noAnomalyDetection(noAnomalyDetectionOption.isSet())
-               
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
-               
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
-               
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
-               .unorderedRules(unorderedRulesOption.isSet())
-               
.numericObserver((FIMTDDNumericAttributeClassLimitObserver)numericObserverOption.getValue())
-               .voteType((ErrorWeightedVote)votingTypeOption.getValue())
-               .build();
-       
-        topologyBuilder.addProcessor(processor, parallelism);
-     
-        this.resultStream = topologyBuilder.createStream(processor);
-        this.processor.setResultStream(resultStream);
-       }
-       
-       @Override
-       public Processor getInputProcessor() {
-               return processor;
-       }
-
-       @Override
-       public Set<Stream> getResultStreams() {
-               return ImmutableSet.of(this.resultStream);
-       }
+  private static final long serialVersionUID = 1L;
+
+  // Options
+  public FloatOption splitConfidenceOption = new FloatOption(
+      "splitConfidence",
+      'c',
+      "Hoeffding Bound Parameter. The allowable error in split decision, 
values closer to 0 will take longer to decide.",
+      0.0000001, 0.0, 1.0);
+
+  public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
+      't', "Hoeffding Bound Parameter. Threshold below which a split will be 
forced to break ties.",
+      0.05, 0.0, 1.0);
+
+  public IntOption gracePeriodOption = new IntOption("gracePeriod",
+      'g', "Hoeffding Bound Parameter. The number of instances a leaf should 
observe between split attempts.",
+      200, 1, Integer.MAX_VALUE);
+
+  public FlagOption DriftDetectionOption = new 
FlagOption("DoNotDetectChanges", 'H',
+      "Drift Detection. Page-Hinkley.");
+
+  public FloatOption pageHinckleyAlphaOption = new FloatOption(
+      "pageHinckleyAlpha",
+      'a',
+      "The alpha value to use in the Page Hinckley change detection tests.",
+      0.005, 0.0, 1.0);
+
+  public IntOption pageHinckleyThresholdOption = new IntOption(
+      "pageHinckleyThreshold",
+      'l',
+      "The threshold value (Lambda) to be used in the Page Hinckley change 
detection tests.",
+      35, 0, Integer.MAX_VALUE);
+
+  public FlagOption noAnomalyDetectionOption = new 
FlagOption("noAnomalyDetection", 'A',
+      "Disable anomaly Detection.");
+
+  public FloatOption multivariateAnomalyProbabilityThresholdOption = new 
FloatOption(
+      "multivariateAnomalyProbabilityThresholdd",
+      'm',
+      "Multivariate anomaly threshold value.",
+      0.99, 0.0, 1.0);
+
+  public FloatOption univariateAnomalyProbabilityThresholdOption = new 
FloatOption(
+      "univariateAnomalyprobabilityThreshold",
+      'u',
+      "Univariate anomaly threshold value.",
+      0.10, 0.0, 1.0);
+
+  public IntOption anomalyNumInstThresholdOption = new IntOption(
+      "anomalyThreshold",
+      'n',
+      "The threshold value of anomalies to be used in the anomaly detection.",
+      30, 0, Integer.MAX_VALUE); // num minimum of instances to detect
+                                 // anomalies. 15.
+
+  public FlagOption unorderedRulesOption = new 
FlagOption("setUnorderedRulesOn", 'U',
+      "unorderedRules.");
+
+  public ClassOption numericObserverOption = new ClassOption("numericObserver",
+      'z', "Numeric observer.",
+      FIMTDDNumericAttributeClassLimitObserver.class,
+      "FIMTDDNumericAttributeClassLimitObserver");
+
+  public MultiChoiceOption predictionFunctionOption = new MultiChoiceOption(
+      "predictionFunctionOption", 'P', "The prediction function to use.", new 
String[] {
+          "Adaptative", "Perceptron", "Target Mean" }, new String[] {
+          "Adaptative", "Perceptron", "Target Mean" }, 0);
+
+  public FlagOption constantLearningRatioDecayOption = new FlagOption(
+      "learningRatio_Decay_set_constant", 'd',
+      "Learning Ratio Decay in Perceptron set to be constant. (The next 
parameter).");
+
+  public FloatOption learningRatioOption = new FloatOption(
+      "learningRatio", 's',
+      "Constante Learning Ratio to use for training the Perceptrons in the 
leaves.", 0.025);
+
+  public ClassOption votingTypeOption = new ClassOption("votingType",
+      'V', "Voting Type.",
+      ErrorWeightedVote.class,
+      "InverseErrorWeightedVote");
+
+  // Processor
+  private AMRulesRegressorProcessor processor;
+
+  // Stream
+  private Stream resultStream;
+
+  @Override
+  public void init(TopologyBuilder topologyBuilder, Instances dataset, int 
parallelism) {
+    this.processor = new AMRulesRegressorProcessor.Builder(dataset)
+        .threshold(pageHinckleyThresholdOption.getValue())
+        .alpha(pageHinckleyAlphaOption.getValue())
+        .changeDetection(this.DriftDetectionOption.isSet())
+        .predictionFunction(predictionFunctionOption.getChosenIndex())
+        .constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
+        .learningRatio(learningRatioOption.getValue())
+        .splitConfidence(splitConfidenceOption.getValue())
+        .tieThreshold(tieThresholdOption.getValue())
+        .gracePeriod(gracePeriodOption.getValue())
+        .noAnomalyDetection(noAnomalyDetectionOption.isSet())
+        
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
+        
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
+        
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
+        .unorderedRules(unorderedRulesOption.isSet())
+        .numericObserver((FIMTDDNumericAttributeClassLimitObserver) 
numericObserverOption.getValue())
+        .voteType((ErrorWeightedVote) votingTypeOption.getValue())
+        .build();
+
+    topologyBuilder.addProcessor(processor, parallelism);
+
+    this.resultStream = topologyBuilder.createStream(processor);
+    this.processor.setResultStream(resultStream);
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return processor;
+  }
+
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/HorizontalAMRulesRegressor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/HorizontalAMRulesRegressor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/HorizontalAMRulesRegressor.java
index 14f5f38..9d4c5e6 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/HorizontalAMRulesRegressor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/HorizontalAMRulesRegressor.java
@@ -40,201 +40,201 @@ import com.yahoo.labs.samoa.topology.Stream;
 import com.yahoo.labs.samoa.topology.TopologyBuilder;
 
 /**
- * Horizontal  AMRules Regressor
- * is a distributed learner for regression rules learner.
- * It applies both horizontal parallelism (dividing incoming streams)
+ * Horizontal AMRules Regressor is a distributed learner for regression rules
+ * learner. It applies both horizontal parallelism (dividing incoming streams)
  * and vertical parallelism on AMRules algorithm.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class HorizontalAMRulesRegressor implements RegressionLearner, 
Configurable {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 2785944439173586051L;
-
-       // Options
-       public FloatOption splitConfidenceOption = new FloatOption(
-                       "splitConfidence",
-                       'c',
-                       "Hoeffding Bound Parameter. The allowable error in 
split decision, values closer to 0 will take longer to decide.",
-                       0.0000001, 0.0, 1.0);
-
-       public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
-                       't', "Hoeffding Bound Parameter. Threshold below which 
a split will be forced to break ties.",
-                       0.05, 0.0, 1.0);
-
-       public IntOption gracePeriodOption = new IntOption("gracePeriod",
-                       'g', "Hoeffding Bound Parameter. The number of 
instances a leaf should observe between split attempts.",
-                       200, 1, Integer.MAX_VALUE);
-
-       public FlagOption DriftDetectionOption = new 
FlagOption("DoNotDetectChanges", 'H',
-                       "Drift Detection. Page-Hinkley.");
-
-       public FloatOption pageHinckleyAlphaOption = new FloatOption(
-                       "pageHinckleyAlpha",
-                       'a',
-                       "The alpha value to use in the Page Hinckley change 
detection tests.",
-                       0.005, 0.0, 1.0);
-
-       public IntOption pageHinckleyThresholdOption = new IntOption(
-                       "pageHinckleyThreshold",
-                       'l',
-                       "The threshold value (Lambda) to be used in the Page 
Hinckley change detection tests.",
-                       35, 0, Integer.MAX_VALUE);
-
-       public FlagOption noAnomalyDetectionOption = new 
FlagOption("noAnomalyDetection", 'A',
-                       "Disable anomaly Detection.");
-
-       public FloatOption multivariateAnomalyProbabilityThresholdOption = new 
FloatOption(
-                       "multivariateAnomalyProbabilityThresholdd",
-                       'm',
-                       "Multivariate anomaly threshold value.",
-                       0.99, 0.0, 1.0);
-
-       public FloatOption univariateAnomalyProbabilityThresholdOption = new 
FloatOption(
-                       "univariateAnomalyprobabilityThreshold",
-                       'u',
-                       "Univariate anomaly threshold value.",
-                       0.10, 0.0, 1.0);
-
-       public IntOption anomalyNumInstThresholdOption = new IntOption(
-                       "anomalyThreshold",
-                       'n',
-                       "The threshold value of anomalies to be used in the 
anomaly detection.",
-                       30, 0, Integer.MAX_VALUE); // num minimum of instances 
to detect anomalies. 15.
-
-       public FlagOption unorderedRulesOption = new 
FlagOption("setUnorderedRulesOn", 'U',
-                       "unorderedRules.");
-
-       public ClassOption numericObserverOption = new 
ClassOption("numericObserver",
-                       'z', "Numeric observer.", 
-                       FIMTDDNumericAttributeClassLimitObserver.class,
-                       "FIMTDDNumericAttributeClassLimitObserver");
-
-       public MultiChoiceOption predictionFunctionOption = new 
MultiChoiceOption(
-                       "predictionFunctionOption", 'P', "The prediction 
function to use.", new String[]{
-                                       "Adaptative","Perceptron", "Target 
Mean"}, new String[]{
-                                       "Adaptative","Perceptron", "Target 
Mean"}, 0);
-
-       public FlagOption constantLearningRatioDecayOption = new FlagOption(
-                       "learningRatio_Decay_set_constant", 'd',
-                       "Learning Ratio Decay in Perceptron set to be constant. 
(The next parameter).");
-
-       public FloatOption learningRatioOption = new FloatOption(
-                       "learningRatio", 's', 
-                       "Constante Learning Ratio to use for training the 
Perceptrons in the leaves.", 0.025);
-
-       public MultiChoiceOption votingTypeOption = new MultiChoiceOption(
-                       "votingType", 'V', "Voting Type.", new String[]{
-                                       
"InverseErrorWeightedVote","UniformWeightedVote"}, new String[]{
-                                       
"InverseErrorWeightedVote","UniformWeightedVote"}, 0);
-       
-       public IntOption learnerParallelismOption = new IntOption(
-            "leanerParallelism",
-            'p',
-            "The number of local statistics PI to do distributed computation",
-            1, 1, Integer.MAX_VALUE);
-       public IntOption ruleSetParallelismOption = new IntOption(
-            "modelParallelism",
-            'r',
-            "The number of replicated model (rule set) PIs",
-            1, 1, Integer.MAX_VALUE);
-
-       // Processor
-       private AMRRuleSetProcessor model;
-
-       private Stream modelResultStream;
-
-       private Stream rootResultStream;
-
-       // private Stream resultStream;
-
-       @Override
-       public void init(TopologyBuilder topologyBuilder, Instances dataset, 
int parallelism) {
-               
-               // Create MODEL PIs
-               this.model = new AMRRuleSetProcessor.Builder(dataset)
-               .noAnomalyDetection(noAnomalyDetectionOption.isSet())
-               
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
-               
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
-               
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
-               .unorderedRules(unorderedRulesOption.isSet())
-               .voteType(votingTypeOption.getChosenIndex())
-               .build();
-
-               topologyBuilder.addProcessor(model, 
this.ruleSetParallelismOption.getValue());
-               
-               // MODEL PIs streams
-               Stream forwardToRootStream = 
topologyBuilder.createStream(this.model);
-               Stream forwardToLearnerStream = 
topologyBuilder.createStream(this.model);
-               this.modelResultStream = 
topologyBuilder.createStream(this.model);
-               
-               this.model.setDefaultRuleStream(forwardToRootStream);
-               this.model.setStatisticsStream(forwardToLearnerStream);
-               this.model.setResultStream(this.modelResultStream);
-               
-               // Create DefaultRule PI
-               AMRDefaultRuleProcessor root = new 
AMRDefaultRuleProcessor.Builder(dataset)
-                               
.threshold(pageHinckleyThresholdOption.getValue())
-                               .alpha(pageHinckleyAlphaOption.getValue())
-                               
.changeDetection(this.DriftDetectionOption.isSet())
-                               
.predictionFunction(predictionFunctionOption.getChosenIndex())
-                               
.constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
-                               .learningRatio(learningRatioOption.getValue())
-                               
.splitConfidence(splitConfidenceOption.getValue())
-                               .tieThreshold(tieThresholdOption.getValue())
-                               .gracePeriod(gracePeriodOption.getValue())
-                               
.numericObserver((FIMTDDNumericAttributeClassLimitObserver) 
numericObserverOption.getValue())
-                               .build();
-               
-               topologyBuilder.addProcessor(root);
-               
-               // Default Rule PI streams
-               Stream newRuleStream = topologyBuilder.createStream(root);
-               this.rootResultStream = topologyBuilder.createStream(root);
-               
-               root.setRuleStream(newRuleStream);
-               root.setResultStream(this.rootResultStream);
-               
-               // Create Learner PIs
-               AMRLearnerProcessor learner = new 
AMRLearnerProcessor.Builder(dataset)
-                               
.splitConfidence(splitConfidenceOption.getValue())
-                               .tieThreshold(tieThresholdOption.getValue())
-                               .gracePeriod(gracePeriodOption.getValue())
-                               
.noAnomalyDetection(noAnomalyDetectionOption.isSet())
-                               
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
-                               
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
-                               
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
-                               .build();
-    
-               topologyBuilder.addProcessor(learner, 
this.learnerParallelismOption.getValue());
-
-               Stream predicateStream = topologyBuilder.createStream(learner);
-               learner.setOutputStream(predicateStream);
-               
-               // Connect streams
-               // to MODEL
-               topologyBuilder.connectInputAllStream(newRuleStream, 
this.model);
-               topologyBuilder.connectInputAllStream(predicateStream, 
this.model);
-               // to ROOT
-               topologyBuilder.connectInputShuffleStream(forwardToRootStream, 
root);
-               // to LEARNER
-               topologyBuilder.connectInputKeyStream(forwardToLearnerStream, 
learner);
-               topologyBuilder.connectInputAllStream(newRuleStream, learner);
-       }
-
-       @Override
-       public Processor getInputProcessor() {
-               return model;
-       }
-       
-       @Override
-       public Set<Stream> getResultStreams() {
-               Set<Stream> streams = 
ImmutableSet.of(this.modelResultStream,this.rootResultStream);
-               return streams;
-       }
+  private static final long serialVersionUID = 2785944439173586051L;
+
+  // Options
+  public FloatOption splitConfidenceOption = new FloatOption(
+      "splitConfidence",
+      'c',
+      "Hoeffding Bound Parameter. The allowable error in split decision, 
values closer to 0 will take longer to decide.",
+      0.0000001, 0.0, 1.0);
+
+  public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
+      't', "Hoeffding Bound Parameter. Threshold below which a split will be 
forced to break ties.",
+      0.05, 0.0, 1.0);
+
+  public IntOption gracePeriodOption = new IntOption("gracePeriod",
+      'g', "Hoeffding Bound Parameter. The number of instances a leaf should 
observe between split attempts.",
+      200, 1, Integer.MAX_VALUE);
+
+  public FlagOption DriftDetectionOption = new 
FlagOption("DoNotDetectChanges", 'H',
+      "Drift Detection. Page-Hinkley.");
+
+  public FloatOption pageHinckleyAlphaOption = new FloatOption(
+      "pageHinckleyAlpha",
+      'a',
+      "The alpha value to use in the Page Hinckley change detection tests.",
+      0.005, 0.0, 1.0);
+
+  public IntOption pageHinckleyThresholdOption = new IntOption(
+      "pageHinckleyThreshold",
+      'l',
+      "The threshold value (Lambda) to be used in the Page Hinckley change 
detection tests.",
+      35, 0, Integer.MAX_VALUE);
+
+  public FlagOption noAnomalyDetectionOption = new 
FlagOption("noAnomalyDetection", 'A',
+      "Disable anomaly Detection.");
+
+  public FloatOption multivariateAnomalyProbabilityThresholdOption = new 
FloatOption(
+      "multivariateAnomalyProbabilityThresholdd",
+      'm',
+      "Multivariate anomaly threshold value.",
+      0.99, 0.0, 1.0);
+
+  public FloatOption univariateAnomalyProbabilityThresholdOption = new 
FloatOption(
+      "univariateAnomalyprobabilityThreshold",
+      'u',
+      "Univariate anomaly threshold value.",
+      0.10, 0.0, 1.0);
+
+  public IntOption anomalyNumInstThresholdOption = new IntOption(
+      "anomalyThreshold",
+      'n',
+      "The threshold value of anomalies to be used in the anomaly detection.",
+      30, 0, Integer.MAX_VALUE); // num minimum of instances to detect
+                                 // anomalies. 15.
+
+  public FlagOption unorderedRulesOption = new 
FlagOption("setUnorderedRulesOn", 'U',
+      "unorderedRules.");
+
+  public ClassOption numericObserverOption = new ClassOption("numericObserver",
+      'z', "Numeric observer.",
+      FIMTDDNumericAttributeClassLimitObserver.class,
+      "FIMTDDNumericAttributeClassLimitObserver");
+
+  public MultiChoiceOption predictionFunctionOption = new MultiChoiceOption(
+      "predictionFunctionOption", 'P', "The prediction function to use.", new 
String[] {
+          "Adaptative", "Perceptron", "Target Mean" }, new String[] {
+          "Adaptative", "Perceptron", "Target Mean" }, 0);
+
+  public FlagOption constantLearningRatioDecayOption = new FlagOption(
+      "learningRatio_Decay_set_constant", 'd',
+      "Learning Ratio Decay in Perceptron set to be constant. (The next 
parameter).");
+
+  public FloatOption learningRatioOption = new FloatOption(
+      "learningRatio", 's',
+      "Constante Learning Ratio to use for training the Perceptrons in the 
leaves.", 0.025);
+
+  public MultiChoiceOption votingTypeOption = new MultiChoiceOption(
+      "votingType", 'V', "Voting Type.", new String[] {
+          "InverseErrorWeightedVote", "UniformWeightedVote" }, new String[] {
+          "InverseErrorWeightedVote", "UniformWeightedVote" }, 0);
+
+  public IntOption learnerParallelismOption = new IntOption(
+      "leanerParallelism",
+      'p',
+      "The number of local statistics PI to do distributed computation",
+      1, 1, Integer.MAX_VALUE);
+  public IntOption ruleSetParallelismOption = new IntOption(
+      "modelParallelism",
+      'r',
+      "The number of replicated model (rule set) PIs",
+      1, 1, Integer.MAX_VALUE);
+
+  // Processor
+  private AMRRuleSetProcessor model;
+
+  private Stream modelResultStream;
+
+  private Stream rootResultStream;
+
+  // private Stream resultStream;
+
+  @Override
+  public void init(TopologyBuilder topologyBuilder, Instances dataset, int 
parallelism) {
+
+    // Create MODEL PIs
+    this.model = new AMRRuleSetProcessor.Builder(dataset)
+        .noAnomalyDetection(noAnomalyDetectionOption.isSet())
+        
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
+        
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
+        
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
+        .unorderedRules(unorderedRulesOption.isSet())
+        .voteType(votingTypeOption.getChosenIndex())
+        .build();
+
+    topologyBuilder.addProcessor(model, 
this.ruleSetParallelismOption.getValue());
+
+    // MODEL PIs streams
+    Stream forwardToRootStream = topologyBuilder.createStream(this.model);
+    Stream forwardToLearnerStream = topologyBuilder.createStream(this.model);
+    this.modelResultStream = topologyBuilder.createStream(this.model);
+
+    this.model.setDefaultRuleStream(forwardToRootStream);
+    this.model.setStatisticsStream(forwardToLearnerStream);
+    this.model.setResultStream(this.modelResultStream);
+
+    // Create DefaultRule PI
+    AMRDefaultRuleProcessor root = new AMRDefaultRuleProcessor.Builder(dataset)
+        .threshold(pageHinckleyThresholdOption.getValue())
+        .alpha(pageHinckleyAlphaOption.getValue())
+        .changeDetection(this.DriftDetectionOption.isSet())
+        .predictionFunction(predictionFunctionOption.getChosenIndex())
+        .constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
+        .learningRatio(learningRatioOption.getValue())
+        .splitConfidence(splitConfidenceOption.getValue())
+        .tieThreshold(tieThresholdOption.getValue())
+        .gracePeriod(gracePeriodOption.getValue())
+        .numericObserver((FIMTDDNumericAttributeClassLimitObserver) 
numericObserverOption.getValue())
+        .build();
+
+    topologyBuilder.addProcessor(root);
+
+    // Default Rule PI streams
+    Stream newRuleStream = topologyBuilder.createStream(root);
+    this.rootResultStream = topologyBuilder.createStream(root);
+
+    root.setRuleStream(newRuleStream);
+    root.setResultStream(this.rootResultStream);
+
+    // Create Learner PIs
+    AMRLearnerProcessor learner = new AMRLearnerProcessor.Builder(dataset)
+        .splitConfidence(splitConfidenceOption.getValue())
+        .tieThreshold(tieThresholdOption.getValue())
+        .gracePeriod(gracePeriodOption.getValue())
+        .noAnomalyDetection(noAnomalyDetectionOption.isSet())
+        
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
+        
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
+        
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
+        .build();
+
+    topologyBuilder.addProcessor(learner, 
this.learnerParallelismOption.getValue());
+
+    Stream predicateStream = topologyBuilder.createStream(learner);
+    learner.setOutputStream(predicateStream);
+
+    // Connect streams
+    // to MODEL
+    topologyBuilder.connectInputAllStream(newRuleStream, this.model);
+    topologyBuilder.connectInputAllStream(predicateStream, this.model);
+    // to ROOT
+    topologyBuilder.connectInputShuffleStream(forwardToRootStream, root);
+    // to LEARNER
+    topologyBuilder.connectInputKeyStream(forwardToLearnerStream, learner);
+    topologyBuilder.connectInputAllStream(newRuleStream, learner);
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return model;
+  }
+
+  @Override
+  public Set<Stream> getResultStreams() {
+    Set<Stream> streams = ImmutableSet.of(this.modelResultStream, 
this.rootResultStream);
+    return streams;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/VerticalAMRulesRegressor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/VerticalAMRulesRegressor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/VerticalAMRulesRegressor.java
index 597becb..131f086 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/VerticalAMRulesRegressor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/VerticalAMRulesRegressor.java
@@ -38,163 +38,163 @@ import com.yahoo.labs.samoa.topology.Stream;
 import com.yahoo.labs.samoa.topology.TopologyBuilder;
 
 /**
- * Vertical  AMRules Regressor
- * is a distributed learner for regression rules learner.
- * It applies vertical parallelism on AMRules regressor.
+ * Vertical AMRules Regressor is a distributed learner for regression rules
+ * learner. It applies vertical parallelism on AMRules regressor.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 
 public class VerticalAMRulesRegressor implements RegressionLearner, 
Configurable {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 2785944439173586051L;
-
-       // Options
-       public FloatOption splitConfidenceOption = new FloatOption(
-                       "splitConfidence",
-                       'c',
-                       "Hoeffding Bound Parameter. The allowable error in 
split decision, values closer to 0 will take longer to decide.",
-                       0.0000001, 0.0, 1.0);
-
-       public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
-                       't', "Hoeffding Bound Parameter. Threshold below which 
a split will be forced to break ties.",
-                       0.05, 0.0, 1.0);
-
-       public IntOption gracePeriodOption = new IntOption("gracePeriod",
-                       'g', "Hoeffding Bound Parameter. The number of 
instances a leaf should observe between split attempts.",
-                       200, 1, Integer.MAX_VALUE);
-
-       public FlagOption DriftDetectionOption = new 
FlagOption("DoNotDetectChanges", 'H',
-                       "Drift Detection. Page-Hinkley.");
-
-       public FloatOption pageHinckleyAlphaOption = new FloatOption(
-                       "pageHinckleyAlpha",
-                       'a',
-                       "The alpha value to use in the Page Hinckley change 
detection tests.",
-                       00.005, 0.0, 1.0);
-
-       public IntOption pageHinckleyThresholdOption = new IntOption(
-                       "pageHinckleyThreshold",
-                       'l',
-                       "The threshold value (Lambda) to be used in the Page 
Hinckley change detection tests.",
-                       35, 0, Integer.MAX_VALUE);
-
-       public FlagOption noAnomalyDetectionOption = new 
FlagOption("noAnomalyDetection", 'A',
-                       "Disable anomaly Detection.");
-
-       public FloatOption multivariateAnomalyProbabilityThresholdOption = new 
FloatOption(
-                       "multivariateAnomalyProbabilityThresholdd",
-                       'm',
-                       "Multivariate anomaly threshold value.",
-                       0.99, 0.0, 1.0);
-
-       public FloatOption univariateAnomalyProbabilityThresholdOption = new 
FloatOption(
-                       "univariateAnomalyprobabilityThreshold",
-                       'u',
-                       "Univariate anomaly threshold value.",
-                       0.10, 0.0, 1.0);
-
-       public IntOption anomalyNumInstThresholdOption = new IntOption(
-                       "anomalyThreshold",
-                       'n',
-                       "The threshold value of anomalies to be used in the 
anomaly detection.",
-                       30, 0, Integer.MAX_VALUE); // num minimum of instances 
to detect anomalies. 15.
-
-       public FlagOption unorderedRulesOption = new 
FlagOption("setUnorderedRulesOn", 'U',
-                       "unorderedRules.");
-
-       public ClassOption numericObserverOption = new 
ClassOption("numericObserver",
-                       'z', "Numeric observer.", 
-                       FIMTDDNumericAttributeClassLimitObserver.class,
-                       "FIMTDDNumericAttributeClassLimitObserver");
-
-       public MultiChoiceOption predictionFunctionOption = new 
MultiChoiceOption(
-                       "predictionFunctionOption", 'P', "The prediction 
function to use.", new String[]{
-                                       "Adaptative","Perceptron", "Target 
Mean"}, new String[]{
-                                       "Adaptative","Perceptron", "Target 
Mean"}, 0);
-
-       public FlagOption constantLearningRatioDecayOption = new FlagOption(
-                       "learningRatio_Decay_set_constant", 'd',
-                       "Learning Ratio Decay in Perceptron set to be constant. 
(The next parameter).");
-
-       public FloatOption learningRatioOption = new FloatOption(
-                       "learningRatio", 's', 
-                       "Constante Learning Ratio to use for training the 
Perceptrons in the leaves.", 0.025);
-
-       public MultiChoiceOption votingTypeOption = new MultiChoiceOption(
-                       "votingType", 'V', "Voting Type.", new String[]{
-                                       
"InverseErrorWeightedVote","UniformWeightedVote"}, new String[]{
-                                       
"InverseErrorWeightedVote","UniformWeightedVote"}, 0);
-       
-       public IntOption parallelismHintOption = new IntOption(
-            "parallelismHint",
-            'p',
-            "The number of local statistics PI to do distributed computation",
-            1, 1, Integer.MAX_VALUE);
-       
-       // Processor
-       private AMRulesAggregatorProcessor aggregator;
-
-       // Stream
-       private Stream resultStream;
-
-       @Override
-       public void init(TopologyBuilder topologyBuilder, Instances dataset, 
int parallelism) {
-
-               this.aggregator = new 
AMRulesAggregatorProcessor.Builder(dataset)
-               .threshold(pageHinckleyThresholdOption.getValue())
-               .alpha(pageHinckleyAlphaOption.getValue())
-               .changeDetection(this.DriftDetectionOption.isSet())
-               .predictionFunction(predictionFunctionOption.getChosenIndex())
-               
.constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
-               .learningRatio(learningRatioOption.getValue())
-               .splitConfidence(splitConfidenceOption.getValue())
-               .tieThreshold(tieThresholdOption.getValue())
-               .gracePeriod(gracePeriodOption.getValue())
-               .noAnomalyDetection(noAnomalyDetectionOption.isSet())
-               
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
-               
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
-               
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
-               .unorderedRules(unorderedRulesOption.isSet())
-               
.numericObserver((FIMTDDNumericAttributeClassLimitObserver)numericObserverOption.getValue())
-               .voteType(votingTypeOption.getChosenIndex())
-               .build();
-
-               topologyBuilder.addProcessor(aggregator);
-
-               Stream statisticsStream = 
topologyBuilder.createStream(aggregator);
-               this.resultStream = topologyBuilder.createStream(aggregator);
-               
-               this.aggregator.setResultStream(resultStream);
-               this.aggregator.setStatisticsStream(statisticsStream);
-
-               AMRulesStatisticsProcessor learner = new 
AMRulesStatisticsProcessor.Builder(dataset)
-                               
.splitConfidence(splitConfidenceOption.getValue())
-                               .tieThreshold(tieThresholdOption.getValue())
-                               .gracePeriod(gracePeriodOption.getValue())
-                               .build();
-    
-               topologyBuilder.addProcessor(learner, 
this.parallelismHintOption.getValue());
-    
-               topologyBuilder.connectInputKeyStream(statisticsStream, 
learner);
-
-               Stream predicateStream = topologyBuilder.createStream(learner);
-               learner.setOutputStream(predicateStream);
-    
-               topologyBuilder.connectInputShuffleStream(predicateStream, 
aggregator);
-       }
-
-       @Override
-       public Processor getInputProcessor() {
-               return aggregator;
-       }
-
-       @Override
-       public Set<Stream> getResultStreams() {
-               return ImmutableSet.of(this.resultStream);
-       }
+  private static final long serialVersionUID = 2785944439173586051L;
+
+  // Options
+  public FloatOption splitConfidenceOption = new FloatOption(
+      "splitConfidence",
+      'c',
+      "Hoeffding Bound Parameter. The allowable error in split decision, 
values closer to 0 will take longer to decide.",
+      0.0000001, 0.0, 1.0);
+
+  public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
+      't', "Hoeffding Bound Parameter. Threshold below which a split will be 
forced to break ties.",
+      0.05, 0.0, 1.0);
+
+  public IntOption gracePeriodOption = new IntOption("gracePeriod",
+      'g', "Hoeffding Bound Parameter. The number of instances a leaf should 
observe between split attempts.",
+      200, 1, Integer.MAX_VALUE);
+
+  public FlagOption DriftDetectionOption = new 
FlagOption("DoNotDetectChanges", 'H',
+      "Drift Detection. Page-Hinkley.");
+
+  public FloatOption pageHinckleyAlphaOption = new FloatOption(
+      "pageHinckleyAlpha",
+      'a',
+      "The alpha value to use in the Page Hinckley change detection tests.",
+      00.005, 0.0, 1.0);
+
+  public IntOption pageHinckleyThresholdOption = new IntOption(
+      "pageHinckleyThreshold",
+      'l',
+      "The threshold value (Lambda) to be used in the Page Hinckley change 
detection tests.",
+      35, 0, Integer.MAX_VALUE);
+
+  public FlagOption noAnomalyDetectionOption = new 
FlagOption("noAnomalyDetection", 'A',
+      "Disable anomaly Detection.");
+
+  public FloatOption multivariateAnomalyProbabilityThresholdOption = new 
FloatOption(
+      "multivariateAnomalyProbabilityThresholdd",
+      'm',
+      "Multivariate anomaly threshold value.",
+      0.99, 0.0, 1.0);
+
+  public FloatOption univariateAnomalyProbabilityThresholdOption = new 
FloatOption(
+      "univariateAnomalyprobabilityThreshold",
+      'u',
+      "Univariate anomaly threshold value.",
+      0.10, 0.0, 1.0);
+
+  public IntOption anomalyNumInstThresholdOption = new IntOption(
+      "anomalyThreshold",
+      'n',
+      "The threshold value of anomalies to be used in the anomaly detection.",
+      30, 0, Integer.MAX_VALUE); // num minimum of instances to detect
+                                 // anomalies. 15.
+
+  public FlagOption unorderedRulesOption = new 
FlagOption("setUnorderedRulesOn", 'U',
+      "unorderedRules.");
+
+  public ClassOption numericObserverOption = new ClassOption("numericObserver",
+      'z', "Numeric observer.",
+      FIMTDDNumericAttributeClassLimitObserver.class,
+      "FIMTDDNumericAttributeClassLimitObserver");
+
+  public MultiChoiceOption predictionFunctionOption = new MultiChoiceOption(
+      "predictionFunctionOption", 'P', "The prediction function to use.", new 
String[] {
+          "Adaptative", "Perceptron", "Target Mean" }, new String[] {
+          "Adaptative", "Perceptron", "Target Mean" }, 0);
+
+  public FlagOption constantLearningRatioDecayOption = new FlagOption(
+      "learningRatio_Decay_set_constant", 'd',
+      "Learning Ratio Decay in Perceptron set to be constant. (The next 
parameter).");
+
+  public FloatOption learningRatioOption = new FloatOption(
+      "learningRatio", 's',
+      "Constante Learning Ratio to use for training the Perceptrons in the 
leaves.", 0.025);
+
+  public MultiChoiceOption votingTypeOption = new MultiChoiceOption(
+      "votingType", 'V', "Voting Type.", new String[] {
+          "InverseErrorWeightedVote", "UniformWeightedVote" }, new String[] {
+          "InverseErrorWeightedVote", "UniformWeightedVote" }, 0);
+
+  public IntOption parallelismHintOption = new IntOption(
+      "parallelismHint",
+      'p',
+      "The number of local statistics PI to do distributed computation",
+      1, 1, Integer.MAX_VALUE);
+
+  // Processor
+  private AMRulesAggregatorProcessor aggregator;
+
+  // Stream
+  private Stream resultStream;
+
+  @Override
+  public void init(TopologyBuilder topologyBuilder, Instances dataset, int 
parallelism) {
+
+    this.aggregator = new AMRulesAggregatorProcessor.Builder(dataset)
+        .threshold(pageHinckleyThresholdOption.getValue())
+        .alpha(pageHinckleyAlphaOption.getValue())
+        .changeDetection(this.DriftDetectionOption.isSet())
+        .predictionFunction(predictionFunctionOption.getChosenIndex())
+        .constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
+        .learningRatio(learningRatioOption.getValue())
+        .splitConfidence(splitConfidenceOption.getValue())
+        .tieThreshold(tieThresholdOption.getValue())
+        .gracePeriod(gracePeriodOption.getValue())
+        .noAnomalyDetection(noAnomalyDetectionOption.isSet())
+        
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
+        
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
+        
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
+        .unorderedRules(unorderedRulesOption.isSet())
+        .numericObserver((FIMTDDNumericAttributeClassLimitObserver) 
numericObserverOption.getValue())
+        .voteType(votingTypeOption.getChosenIndex())
+        .build();
+
+    topologyBuilder.addProcessor(aggregator);
+
+    Stream statisticsStream = topologyBuilder.createStream(aggregator);
+    this.resultStream = topologyBuilder.createStream(aggregator);
+
+    this.aggregator.setResultStream(resultStream);
+    this.aggregator.setStatisticsStream(statisticsStream);
+
+    AMRulesStatisticsProcessor learner = new 
AMRulesStatisticsProcessor.Builder(dataset)
+        .splitConfidence(splitConfidenceOption.getValue())
+        .tieThreshold(tieThresholdOption.getValue())
+        .gracePeriod(gracePeriodOption.getValue())
+        .build();
+
+    topologyBuilder.addProcessor(learner, 
this.parallelismHintOption.getValue());
+
+    topologyBuilder.connectInputKeyStream(statisticsStream, learner);
+
+    Stream predicateStream = topologyBuilder.createStream(learner);
+    learner.setOutputStream(predicateStream);
+
+    topologyBuilder.connectInputShuffleStream(predicateStream, aggregator);
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return aggregator;
+  }
+
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
index f83d6fd..48e9dbb 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
@@ -38,472 +38,469 @@ import 
com.yahoo.labs.samoa.moa.classifiers.rules.core.voting.ErrorWeightedVote;
 import com.yahoo.labs.samoa.topology.Stream;
 
 /**
- * AMRules Regressor Processor
- * is the main (and only) processor for AMRulesRegressor task.
- * It is adapted from the AMRules implementation in MOA.
+ * AMRules Regressor Processor is the main (and only) processor for
+ * AMRulesRegressor task. It is adapted from the AMRules implementation in MOA.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class AMRulesRegressorProcessor implements Processor {
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 1L;
-
-       private int processorId;
-
-       // Rules & default rule
-       protected List<ActiveRule> ruleSet;
-       protected ActiveRule defaultRule;
-       protected int ruleNumberID;
-       protected double[] statistics;
-
-       // SAMOA Stream
-       private Stream resultStream;
-
-       // Options
-       protected int pageHinckleyThreshold;
-       protected double pageHinckleyAlpha;
-       protected boolean driftDetection;
-       protected int predictionFunction; // Adaptive=0 Perceptron=1 
TargetMean=2
-       protected boolean constantLearningRatioDecay;
-       protected double learningRatio;
-
-       protected double splitConfidence;
-       protected double tieThreshold;
-       protected int gracePeriod;
-
-       protected boolean noAnomalyDetection;
-       protected double multivariateAnomalyProbabilityThreshold;
-       protected double univariateAnomalyprobabilityThreshold;
-       protected int anomalyNumInstThreshold;
-
-       protected boolean unorderedRules;
-
-       protected FIMTDDNumericAttributeClassLimitObserver numericObserver;
-       
-       protected ErrorWeightedVote voteType;
-       
-       /*
-        * Constructor
-        */
-       public AMRulesRegressorProcessor (Builder builder) {
-               this.pageHinckleyThreshold = builder.pageHinckleyThreshold;
-               this.pageHinckleyAlpha = builder.pageHinckleyAlpha;
-               this.driftDetection = builder.driftDetection;
-               this.predictionFunction = builder.predictionFunction;
-               this.constantLearningRatioDecay = 
builder.constantLearningRatioDecay;
-               this.learningRatio = builder.learningRatio;
-               this.splitConfidence = builder.splitConfidence;
-               this.tieThreshold = builder.tieThreshold;
-               this.gracePeriod = builder.gracePeriod;
-               
-               this.noAnomalyDetection = builder.noAnomalyDetection;
-               this.multivariateAnomalyProbabilityThreshold = 
builder.multivariateAnomalyProbabilityThreshold;
-               this.univariateAnomalyprobabilityThreshold = 
builder.univariateAnomalyprobabilityThreshold;
-               this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold;
-               this.unorderedRules = builder.unorderedRules;
-               
-               this.numericObserver = builder.numericObserver;
-               this.voteType = builder.voteType;
-       }
-       
-       /*
-        * Process
-        */
-       @Override
-       public boolean process(ContentEvent event) {
-               InstanceContentEvent instanceEvent = (InstanceContentEvent) 
event;
-       
-               // predict
-               if (instanceEvent.isTesting()) {
-                       this.predictOnInstance(instanceEvent);
-               }
-               
-               // train
-               if (instanceEvent.isTraining()) {
-                       this.trainOnInstance(instanceEvent);
-               }
-               
-               return true;
-       }
-       
-       /*
-        * Prediction
-        */
-       private void predictOnInstance (InstanceContentEvent instanceEvent) {
-               double[] prediction = 
getVotesForInstance(instanceEvent.getInstance());
-               ResultContentEvent rce = newResultContentEvent(prediction, 
instanceEvent);
-               resultStream.put(rce);
-       }
-       
-       /**
-        * Helper method to generate new ResultContentEvent based on an 
instance and
-        * its prediction result.
-        * @param prediction The predicted class label from the decision tree 
model.
-        * @param inEvent The associated instance content event
-        * @return ResultContentEvent to be sent into Evaluator PI or other 
destination PI.
-        */
-       private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent){
-               ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), 
inEvent.getClassId(), prediction, inEvent.isLastEvent());
-               rce.setClassifierIndex(this.processorId);
-               rce.setEvaluationIndex(inEvent.getEvaluationIndex());
-               return rce;
-       }
-       
-       /**
-        * getVotesForInstance extension of the instance method 
getVotesForInstance
-        * in moa.classifier.java
-        * returns the prediction of the instance.
-        * Called in EvaluateModelRegression
-        */            
-       private double[] getVotesForInstance(Instance instance) {
-               ErrorWeightedVote errorWeightedVote=newErrorWeightedVote(); 
-               int numberOfRulesCovering = 0;
-               
-               for (ActiveRule rule: ruleSet) {
-                       if (rule.isCovering(instance) == true){
-                               numberOfRulesCovering++;
-                               double [] vote=rule.getPrediction(instance);
-                               double error= rule.getCurrentError();
-                               errorWeightedVote.addVote(vote,error);
-                               if (!this.unorderedRules) { // Ordered Rules 
Option.
-                                       break; // Only one rule cover the 
instance.
-                               }
-                       }
-               }
-
-               if (numberOfRulesCovering == 0) {
-                       double [] vote=defaultRule.getPrediction(instance);
-                       double error= defaultRule.getCurrentError();
-                       errorWeightedVote.addVote(vote,error);  
-               }       
-               double[] weightedVote=errorWeightedVote.computeWeightedVote();
-               
-               return weightedVote;
-       }
-
-       public ErrorWeightedVote newErrorWeightedVote() {
-               return voteType.getACopy();
-       }
-       
-       /*
-        * Training
-        */
-       private void trainOnInstance (InstanceContentEvent instanceEvent) {
-               this.trainOnInstanceImpl(instanceEvent.getInstance());
-       }
-       public void trainOnInstanceImpl(Instance instance) {
-               /**
-                * AMRules Algorithm
-                * 
-                //For each rule in the rule set
-                       //If rule covers the instance
-                               //if the instance is not an anomaly     
-                                       //Update Change Detection Tests
-                                       //Compute prediction error
-                                       //Call PHTest
-                                               //If change is detected then
-                                                       //Remove rule
-                                               //Else
-                                                       //Update sufficient 
statistics of rule
-                                                       //If number of examples 
in rule  > Nmin
-                                                               //Expand rule
-                                               //If ordered set then
-                                                       //break
-                       //If none of the rule covers the instance
-                               //Update sufficient statistics of default rule
-                               //If number of examples in default rule is 
multiple of Nmin
-                                       //Expand default rule and add it to the 
set of rules
-                                       //Reset the default rule
-                */
-               boolean rulesCoveringInstance = false;
-               Iterator<ActiveRule> ruleIterator= this.ruleSet.iterator();
-               while (ruleIterator.hasNext()) { 
-                       ActiveRule rule = ruleIterator.next();
-                       if (rule.isCovering(instance) == true) {
-                               rulesCoveringInstance = true;
-                               if (isAnomaly(instance, rule) == false) {
-                                       //Update Change Detection Tests
-                                       double error = 
rule.computeError(instance); //Use adaptive mode error
-                                       boolean changeDetected = 
((RuleActiveRegressionNode)rule.getLearningNode()).updateChangeDetection(error);
-                                       if (changeDetected == true) {
-                                               ruleIterator.remove();
-                                       } else {
-                                               rule.updateStatistics(instance);
-                                               if (rule.getInstancesSeen()  % 
this.gracePeriod == 0.0) {
-                                                       if 
(rule.tryToExpand(this.splitConfidence, this.tieThreshold) ) {
-                                                               rule.split();
-                                                       }       
-                                               }
-                                       }
-                                       if (!this.unorderedRules) 
-                                               break;
-                               }
-                       }
-               }       
-
-               if (rulesCoveringInstance == false){ 
-                       defaultRule.updateStatistics(instance);
-                       if (defaultRule.getInstancesSeen() % this.gracePeriod 
== 0.0) {
-                               if 
(defaultRule.tryToExpand(this.splitConfidence, this.tieThreshold) == true) {
-                                       ActiveRule 
newDefaultRule=newRule(defaultRule.getRuleNumberID(),(RuleActiveRegressionNode)defaultRule.getLearningNode(),
-                                                       
((RuleActiveRegressionNode)defaultRule.getLearningNode()).getStatisticsOtherBranchSplit());
 //other branch
-                                       defaultRule.split();
-                                       
defaultRule.setRuleNumberID(++ruleNumberID);
-                                       this.ruleSet.add(this.defaultRule);
-                                       
-                                       defaultRule=newDefaultRule;
-
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Method to verify if the instance is an anomaly.
-        * @param instance
-        * @param rule
-        * @return
-        */
-       private boolean isAnomaly(Instance instance, ActiveRule rule) {
-               //AMRUles is equipped with anomaly detection. If on, compute 
the anomaly value.                         
-               boolean isAnomaly = false;      
-               if (this.noAnomalyDetection == false){
-                       if (rule.getInstancesSeen() >= 
this.anomalyNumInstThreshold) {
-                               isAnomaly = rule.isAnomaly(instance, 
-                                               
this.univariateAnomalyprobabilityThreshold,
-                                               
this.multivariateAnomalyProbabilityThreshold,
-                                               this.anomalyNumInstThreshold);
-                       }
-               }
-               return isAnomaly;
-       }
-       
-       /*
-        * Create new rules
-        */
-       // TODO check this after finish rule, LN
-       private ActiveRule newRule(int ID, RuleActiveRegressionNode node, 
double[] statistics) {
-               ActiveRule r=newRule(ID);
-
-               if (node!=null)
-               {
-                       if(node.getPerceptron()!=null)
-                       {
-                               r.getLearningNode().setPerceptron(new 
Perceptron(node.getPerceptron()));
-                               
r.getLearningNode().getPerceptron().setLearningRatio(this.learningRatio);
-                       }
-                       if (statistics==null)
-                       {
-                               double mean;
-                               if(node.getNodeStatistics().getValue(0)>0){
-                                       
mean=node.getNodeStatistics().getValue(1)/node.getNodeStatistics().getValue(0);
-                                       
r.getLearningNode().getTargetMean().reset(mean, 1); 
-                               }
-                       }  
-               }
-               if (statistics!=null && 
((RuleActiveRegressionNode)r.getLearningNode()).getTargetMean()!=null)
-               {
-                       double mean;
-                       if(statistics[0]>0){
-                               mean=statistics[1]/statistics[0];
-                               
((RuleActiveRegressionNode)r.getLearningNode()).getTargetMean().reset(mean, 
(long)statistics[0]); 
-                       }
-               }
-               return r;
-       }
-
-       private ActiveRule newRule(int ID) {
-               ActiveRule r=new ActiveRule.Builder().
-                               threshold(this.pageHinckleyThreshold).
-                               alpha(this.pageHinckleyAlpha).
-                               changeDetection(this.driftDetection).
-                               predictionFunction(this.predictionFunction).
-                               statistics(new double[3]).
-                               learningRatio(this.learningRatio).
-                               numericObserver(numericObserver).
-                               id(ID).build();
-               return r;
-       }
-       
-       /* 
-        * Init processor
-        */
-       @Override
-       public void onCreate(int id) {
-               this.processorId = id;
-               this.statistics= new double[]{0.0,0,0}; 
-               this.ruleNumberID=0;
-               this.defaultRule = newRule(++this.ruleNumberID);
-               
-               this.ruleSet = new LinkedList<ActiveRule>();
-       }
-
-       /* 
-        * Clone processor
-        */
-       @Override
-       public Processor newProcessor(Processor p) {
-               AMRulesRegressorProcessor oldProcessor = 
(AMRulesRegressorProcessor) p;
-               Builder builder = new Builder(oldProcessor);
-               AMRulesRegressorProcessor newProcessor = builder.build();
-               newProcessor.resultStream = oldProcessor.resultStream;
-               return newProcessor;
-       }
-       
-       /*
-        * Output stream
-        */
-       public void setResultStream(Stream resultStream) {
-               this.resultStream = resultStream;
-       }
-       
-       public Stream getResultStream() {
-               return this.resultStream;
-       }
-       
-       /*
-        * Others
-        */
-    public boolean isRandomizable() {
-       return true;
+  private static final long serialVersionUID = 1L;
+
+  private int processorId;
+
+  // Rules & default rule
+  protected List<ActiveRule> ruleSet;
+  protected ActiveRule defaultRule;
+  protected int ruleNumberID;
+  protected double[] statistics;
+
+  // SAMOA Stream
+  private Stream resultStream;
+
+  // Options
+  protected int pageHinckleyThreshold;
+  protected double pageHinckleyAlpha;
+  protected boolean driftDetection;
+  protected int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2
+  protected boolean constantLearningRatioDecay;
+  protected double learningRatio;
+
+  protected double splitConfidence;
+  protected double tieThreshold;
+  protected int gracePeriod;
+
+  protected boolean noAnomalyDetection;
+  protected double multivariateAnomalyProbabilityThreshold;
+  protected double univariateAnomalyprobabilityThreshold;
+  protected int anomalyNumInstThreshold;
+
+  protected boolean unorderedRules;
+
+  protected FIMTDDNumericAttributeClassLimitObserver numericObserver;
+
+  protected ErrorWeightedVote voteType;
+
+  /*
+   * Constructor
+   */
+  public AMRulesRegressorProcessor(Builder builder) {
+    this.pageHinckleyThreshold = builder.pageHinckleyThreshold;
+    this.pageHinckleyAlpha = builder.pageHinckleyAlpha;
+    this.driftDetection = builder.driftDetection;
+    this.predictionFunction = builder.predictionFunction;
+    this.constantLearningRatioDecay = builder.constantLearningRatioDecay;
+    this.learningRatio = builder.learningRatio;
+    this.splitConfidence = builder.splitConfidence;
+    this.tieThreshold = builder.tieThreshold;
+    this.gracePeriod = builder.gracePeriod;
+
+    this.noAnomalyDetection = builder.noAnomalyDetection;
+    this.multivariateAnomalyProbabilityThreshold = 
builder.multivariateAnomalyProbabilityThreshold;
+    this.univariateAnomalyprobabilityThreshold = 
builder.univariateAnomalyprobabilityThreshold;
+    this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold;
+    this.unorderedRules = builder.unorderedRules;
+
+    this.numericObserver = builder.numericObserver;
+    this.voteType = builder.voteType;
+  }
+
+  /*
+   * Process
+   */
+  @Override
+  public boolean process(ContentEvent event) {
+    InstanceContentEvent instanceEvent = (InstanceContentEvent) event;
+
+    // predict
+    if (instanceEvent.isTesting()) {
+      this.predictOnInstance(instanceEvent);
     }
-       
-       /*
-        * Builder
-        */
-       public static class Builder {
-               private int pageHinckleyThreshold;
-               private double pageHinckleyAlpha;
-               private boolean driftDetection;
-               private int predictionFunction; // Adaptive=0 Perceptron=1 
TargetMean=2
-               private boolean constantLearningRatioDecay;
-               private double learningRatio;
-               private double splitConfidence;
-               private double tieThreshold;
-               private int gracePeriod;
-               
-               private boolean noAnomalyDetection;
-               private double multivariateAnomalyProbabilityThreshold;
-               private double univariateAnomalyprobabilityThreshold;
-               private int anomalyNumInstThreshold;
-               
-               private boolean unorderedRules;
-               
-               private FIMTDDNumericAttributeClassLimitObserver 
numericObserver;
-               private ErrorWeightedVote voteType;
-               
-               private Instances dataset;
-               
-               public Builder(Instances dataset){
-                       this.dataset = dataset;
-               }
-               
-               public Builder(AMRulesRegressorProcessor processor) {
-                       this.pageHinckleyThreshold = 
processor.pageHinckleyThreshold;
-                       this.pageHinckleyAlpha = processor.pageHinckleyAlpha;
-                       this.driftDetection = processor.driftDetection;
-                       this.predictionFunction = processor.predictionFunction;
-                       this.constantLearningRatioDecay = 
processor.constantLearningRatioDecay;
-                       this.learningRatio = processor.learningRatio;
-                       this.splitConfidence = processor.splitConfidence;
-                       this.tieThreshold = processor.tieThreshold;
-                       this.gracePeriod = processor.gracePeriod;
-                       
-                       this.noAnomalyDetection = processor.noAnomalyDetection;
-                       this.multivariateAnomalyProbabilityThreshold = 
processor.multivariateAnomalyProbabilityThreshold;
-                       this.univariateAnomalyprobabilityThreshold = 
processor.univariateAnomalyprobabilityThreshold;
-                       this.anomalyNumInstThreshold = 
processor.anomalyNumInstThreshold;
-                       this.unorderedRules = processor.unorderedRules;
-                       
-                       this.numericObserver = processor.numericObserver;
-                       this.voteType = processor.voteType;
-               }
-               
-               public Builder threshold(int threshold) {
-                       this.pageHinckleyThreshold = threshold;
-                       return this;
-               }
-               
-               public Builder alpha(double alpha) {
-                       this.pageHinckleyAlpha = alpha;
-                       return this;
-               }
-               
-               public Builder changeDetection(boolean changeDetection) {
-                       this.driftDetection = changeDetection;
-                       return this;
-               }
-               
-               public Builder predictionFunction(int predictionFunction) {
-                       this.predictionFunction = predictionFunction;
-                       return this;
-               }
-               
-               public Builder constantLearningRatioDecay(boolean 
constantDecay) {
-                       this.constantLearningRatioDecay = constantDecay;
-                       return this;
-               }
-               
-               public Builder learningRatio(double learningRatio) {
-                       this.learningRatio = learningRatio;
-                       return this;
-               }
-               
-               public Builder splitConfidence(double splitConfidence) {
-                       this.splitConfidence = splitConfidence;
-                       return this;
-               }
-               
-               public Builder tieThreshold(double tieThreshold) {
-                       this.tieThreshold = tieThreshold;
-                       return this;
-               }
-               
-               public Builder gracePeriod(int gracePeriod) {
-                       this.gracePeriod = gracePeriod;
-                       return this;
-               }
-               
-               public Builder noAnomalyDetection(boolean noAnomalyDetection) {
-                       this.noAnomalyDetection = noAnomalyDetection;
-                       return this;
-               }
-               
-               public Builder multivariateAnomalyProbabilityThreshold(double 
mAnomalyThreshold) {
-                       this.multivariateAnomalyProbabilityThreshold = 
mAnomalyThreshold;
-                       return this;
-               }
-               
-               public Builder univariateAnomalyProbabilityThreshold(double 
uAnomalyThreshold) {
-                       this.univariateAnomalyprobabilityThreshold = 
uAnomalyThreshold;
-                       return this;
-               }
-               
-               public Builder anomalyNumberOfInstancesThreshold(int 
anomalyNumInstThreshold) {
-                       this.anomalyNumInstThreshold = anomalyNumInstThreshold;
-                       return this;
-               }
-               
-               public Builder unorderedRules(boolean unorderedRules) {
-                       this.unorderedRules = unorderedRules;
-                       return this;
-               }
-               
-               public Builder 
numericObserver(FIMTDDNumericAttributeClassLimitObserver numericObserver) {
-                       this.numericObserver = numericObserver;
-                       return this;
-               }
-               
-               public Builder voteType(ErrorWeightedVote voteType) {
-                       this.voteType = voteType;
-                       return this;
-               }
-               
-               public AMRulesRegressorProcessor build() {
-                       return new AMRulesRegressorProcessor(this);
-               }
-       }
+
+    // train
+    if (instanceEvent.isTraining()) {
+      this.trainOnInstance(instanceEvent);
+    }
+
+    return true;
+  }
+
+  /*
+   * Prediction
+   */
+  private void predictOnInstance(InstanceContentEvent instanceEvent) {
+    double[] prediction = getVotesForInstance(instanceEvent.getInstance());
+    ResultContentEvent rce = newResultContentEvent(prediction, instanceEvent);
+    resultStream.put(rce);
+  }
+
+  /**
+   * Helper method to generate new ResultContentEvent based on an instance and
+   * its prediction result.
+   * 
+   * @param prediction
+   *          The predicted class label from the decision tree model.
+   * @param inEvent
+   *          The associated instance content event
+   * @return ResultContentEvent to be sent into Evaluator PI or other
+   *         destination PI.
+   */
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  /**
+   * getVotesForInstance extension of the instance method getVotesForInstance 
in
+   * moa.classifier.java returns the prediction of the instance. Called in
+   * EvaluateModelRegression
+   */
+  private double[] getVotesForInstance(Instance instance) {
+    ErrorWeightedVote errorWeightedVote = newErrorWeightedVote();
+    int numberOfRulesCovering = 0;
+
+    for (ActiveRule rule : ruleSet) {
+      if (rule.isCovering(instance) == true) {
+        numberOfRulesCovering++;
+        double[] vote = rule.getPrediction(instance);
+        double error = rule.getCurrentError();
+        errorWeightedVote.addVote(vote, error);
+        if (!this.unorderedRules) { // Ordered Rules Option.
+          break; // Only one rule cover the instance.
+        }
+      }
+    }
+
+    if (numberOfRulesCovering == 0) {
+      double[] vote = defaultRule.getPrediction(instance);
+      double error = defaultRule.getCurrentError();
+      errorWeightedVote.addVote(vote, error);
+    }
+    double[] weightedVote = errorWeightedVote.computeWeightedVote();
+
+    return weightedVote;
+  }
+
+  public ErrorWeightedVote newErrorWeightedVote() {
+    return voteType.getACopy();
+  }
+
+  /*
+   * Training
+   */
+  private void trainOnInstance(InstanceContentEvent instanceEvent) {
+    this.trainOnInstanceImpl(instanceEvent.getInstance());
+  }
+
+  public void trainOnInstanceImpl(Instance instance) {
+    /**
+     * AMRules Algorithm
+     * 
+     * //For each rule in the rule set //If rule covers the instance //if the
+     * instance is not an anomaly //Update Change Detection Tests //Compute
+     * prediction error //Call PHTest //If change is detected then //Remove 
rule
+     * //Else //Update sufficient statistics of rule //If number of examples in
+     * rule > Nmin //Expand rule //If ordered set then //break //If none of the
+     * rule covers the instance //Update sufficient statistics of default rule
+     * //If number of examples in default rule is multiple of Nmin //Expand
+     * default rule and add it to the set of rules //Reset the default rule
+     */
+    boolean rulesCoveringInstance = false;
+    Iterator<ActiveRule> ruleIterator = this.ruleSet.iterator();
+    while (ruleIterator.hasNext()) {
+      ActiveRule rule = ruleIterator.next();
+      if (rule.isCovering(instance) == true) {
+        rulesCoveringInstance = true;
+        if (isAnomaly(instance, rule) == false) {
+          // Update Change Detection Tests
+          double error = rule.computeError(instance); // Use adaptive mode 
error
+          boolean changeDetected = ((RuleActiveRegressionNode) 
rule.getLearningNode()).updateChangeDetection(error);
+          if (changeDetected == true) {
+            ruleIterator.remove();
+          } else {
+            rule.updateStatistics(instance);
+            if (rule.getInstancesSeen() % this.gracePeriod == 0.0) {
+              if (rule.tryToExpand(this.splitConfidence, this.tieThreshold)) {
+                rule.split();
+              }
+            }
+          }
+          if (!this.unorderedRules)
+            break;
+        }
+      }
+    }
+
+    if (rulesCoveringInstance == false) {
+      defaultRule.updateStatistics(instance);
+      if (defaultRule.getInstancesSeen() % this.gracePeriod == 0.0) {
+        if (defaultRule.tryToExpand(this.splitConfidence, this.tieThreshold) 
== true) {
+          ActiveRule newDefaultRule = newRule(defaultRule.getRuleNumberID(),
+              (RuleActiveRegressionNode) defaultRule.getLearningNode(),
+              ((RuleActiveRegressionNode) 
defaultRule.getLearningNode()).getStatisticsOtherBranchSplit()); // other
+                                                                               
                            // branch
+          defaultRule.split();
+          defaultRule.setRuleNumberID(++ruleNumberID);
+          this.ruleSet.add(this.defaultRule);
+
+          defaultRule = newDefaultRule;
+
+        }
+      }
+    }
+  }
+
+  /**
+   * Method to verify if the instance is an anomaly.
+   * 
+   * @param instance
+   * @param rule
+   * @return
+   */
+  private boolean isAnomaly(Instance instance, ActiveRule rule) {
+    // AMRUles is equipped with anomaly detection. If on, compute the anomaly
+    // value.
+    boolean isAnomaly = false;
+    if (this.noAnomalyDetection == false) {
+      if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) {
+        isAnomaly = rule.isAnomaly(instance,
+            this.univariateAnomalyprobabilityThreshold,
+            this.multivariateAnomalyProbabilityThreshold,
+            this.anomalyNumInstThreshold);
+      }
+    }
+    return isAnomaly;
+  }
+
+  /*
+   * Create new rules
+   */
+  // TODO check this after finish rule, LN
+  private ActiveRule newRule(int ID, RuleActiveRegressionNode node, double[] 
statistics) {
+    ActiveRule r = newRule(ID);
+
+    if (node != null)
+    {
+      if (node.getPerceptron() != null)
+      {
+        r.getLearningNode().setPerceptron(new 
Perceptron(node.getPerceptron()));
+        
r.getLearningNode().getPerceptron().setLearningRatio(this.learningRatio);
+      }
+      if (statistics == null)
+      {
+        double mean;
+        if (node.getNodeStatistics().getValue(0) > 0) {
+          mean = node.getNodeStatistics().getValue(1) / 
node.getNodeStatistics().getValue(0);
+          r.getLearningNode().getTargetMean().reset(mean, 1);
+        }
+      }
+    }
+    if (statistics != null && ((RuleActiveRegressionNode) 
r.getLearningNode()).getTargetMean() != null)
+    {
+      double mean;
+      if (statistics[0] > 0) {
+        mean = statistics[1] / statistics[0];
+        ((RuleActiveRegressionNode) 
r.getLearningNode()).getTargetMean().reset(mean, (long) statistics[0]);
+      }
+    }
+    return r;
+  }
+
+  private ActiveRule newRule(int ID) {
+    ActiveRule r = new ActiveRule.Builder().
+        threshold(this.pageHinckleyThreshold).
+        alpha(this.pageHinckleyAlpha).
+        changeDetection(this.driftDetection).
+        predictionFunction(this.predictionFunction).
+        statistics(new double[3]).
+        learningRatio(this.learningRatio).
+        numericObserver(numericObserver).
+        id(ID).build();
+    return r;
+  }
+
+  /*
+   * Init processor
+   */
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+    this.statistics = new double[] { 0.0, 0, 0 };
+    this.ruleNumberID = 0;
+    this.defaultRule = newRule(++this.ruleNumberID);
+
+    this.ruleSet = new LinkedList<ActiveRule>();
+  }
+
+  /*
+   * Clone processor
+   */
+  @Override
+  public Processor newProcessor(Processor p) {
+    AMRulesRegressorProcessor oldProcessor = (AMRulesRegressorProcessor) p;
+    Builder builder = new Builder(oldProcessor);
+    AMRulesRegressorProcessor newProcessor = builder.build();
+    newProcessor.resultStream = oldProcessor.resultStream;
+    return newProcessor;
+  }
+
+  /*
+   * Output stream
+   */
+  public void setResultStream(Stream resultStream) {
+    this.resultStream = resultStream;
+  }
+
+  public Stream getResultStream() {
+    return this.resultStream;
+  }
+
+  /*
+   * Others
+   */
+  public boolean isRandomizable() {
+    return true;
+  }
+
+  /*
+   * Builder
+   */
+  public static class Builder {
+    private int pageHinckleyThreshold;
+    private double pageHinckleyAlpha;
+    private boolean driftDetection;
+    private int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2
+    private boolean constantLearningRatioDecay;
+    private double learningRatio;
+    private double splitConfidence;
+    private double tieThreshold;
+    private int gracePeriod;
+
+    private boolean noAnomalyDetection;
+    private double multivariateAnomalyProbabilityThreshold;
+    private double univariateAnomalyprobabilityThreshold;
+    private int anomalyNumInstThreshold;
+
+    private boolean unorderedRules;
+
+    private FIMTDDNumericAttributeClassLimitObserver numericObserver;
+    private ErrorWeightedVote voteType;
+
+    private Instances dataset;
+
+    public Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    public Builder(AMRulesRegressorProcessor processor) {
+      this.pageHinckleyThreshold = processor.pageHinckleyThreshold;
+      this.pageHinckleyAlpha = processor.pageHinckleyAlpha;
+      this.driftDetection = processor.driftDetection;
+      this.predictionFunction = processor.predictionFunction;
+      this.constantLearningRatioDecay = processor.constantLearningRatioDecay;
+      this.learningRatio = processor.learningRatio;
+      this.splitConfidence = processor.splitConfidence;
+      this.tieThreshold = processor.tieThreshold;
+      this.gracePeriod = processor.gracePeriod;
+
+      this.noAnomalyDetection = processor.noAnomalyDetection;
+      this.multivariateAnomalyProbabilityThreshold = 
processor.multivariateAnomalyProbabilityThreshold;
+      this.univariateAnomalyprobabilityThreshold = 
processor.univariateAnomalyprobabilityThreshold;
+      this.anomalyNumInstThreshold = processor.anomalyNumInstThreshold;
+      this.unorderedRules = processor.unorderedRules;
+
+      this.numericObserver = processor.numericObserver;
+      this.voteType = processor.voteType;
+    }
+
+    public Builder threshold(int threshold) {
+      this.pageHinckleyThreshold = threshold;
+      return this;
+    }
+
+    public Builder alpha(double alpha) {
+      this.pageHinckleyAlpha = alpha;
+      return this;
+    }
+
+    public Builder changeDetection(boolean changeDetection) {
+      this.driftDetection = changeDetection;
+      return this;
+    }
+
+    public Builder predictionFunction(int predictionFunction) {
+      this.predictionFunction = predictionFunction;
+      return this;
+    }
+
+    public Builder constantLearningRatioDecay(boolean constantDecay) {
+      this.constantLearningRatioDecay = constantDecay;
+      return this;
+    }
+
+    public Builder learningRatio(double learningRatio) {
+      this.learningRatio = learningRatio;
+      return this;
+    }
+
+    public Builder splitConfidence(double splitConfidence) {
+      this.splitConfidence = splitConfidence;
+      return this;
+    }
+
+    public Builder tieThreshold(double tieThreshold) {
+      this.tieThreshold = tieThreshold;
+      return this;
+    }
+
+    public Builder gracePeriod(int gracePeriod) {
+      this.gracePeriod = gracePeriod;
+      return this;
+    }
+
+    public Builder noAnomalyDetection(boolean noAnomalyDetection) {
+      this.noAnomalyDetection = noAnomalyDetection;
+      return this;
+    }
+
+    public Builder multivariateAnomalyProbabilityThreshold(double 
mAnomalyThreshold) {
+      this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold;
+      return this;
+    }
+
+    public Builder univariateAnomalyProbabilityThreshold(double 
uAnomalyThreshold) {
+      this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold;
+      return this;
+    }
+
+    public Builder anomalyNumberOfInstancesThreshold(int 
anomalyNumInstThreshold) {
+      this.anomalyNumInstThreshold = anomalyNumInstThreshold;
+      return this;
+    }
+
+    public Builder unorderedRules(boolean unorderedRules) {
+      this.unorderedRules = unorderedRules;
+      return this;
+    }
+
+    public Builder numericObserver(FIMTDDNumericAttributeClassLimitObserver 
numericObserver) {
+      this.numericObserver = numericObserver;
+      return this;
+    }
+
+    public Builder voteType(ErrorWeightedVote voteType) {
+      this.voteType = voteType;
+      return this;
+    }
+
+    public AMRulesRegressorProcessor build() {
+      return new AMRulesRegressorProcessor(this);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/ActiveRule.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/ActiveRule.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/ActiveRule.java
index b6fba99..5d02079 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/ActiveRule.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/ActiveRule.java
@@ -28,199 +28,202 @@ import 
com.yahoo.labs.samoa.moa.classifiers.rules.core.attributeclassobservers.F
 import 
com.yahoo.labs.samoa.moa.classifiers.rules.core.conditionaltests.NumericAttributeBinaryRulePredicate;
 
 /**
- * ActiveRule is a LearningRule that actively update its LearningNode
- * with incoming instances.
+ * ActiveRule is a LearningRule that actively update its LearningNode with
+ * incoming instances.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 
 public class ActiveRule extends LearningRule {
-       
-       private static final long serialVersionUID = 1L;
-
-       private double[] statisticsOtherBranchSplit;
-
-       private Builder builder;
-       
-       private RuleActiveRegressionNode learningNode;
-       
-       private RuleSplitNode lastUpdatedRuleSplitNode;
-       
-       /*
-        * Constructor with Builder
-        */
-       public ActiveRule() {
-               super();
-               this.builder = null;
-               this.learningNode = null;
-               this.ruleNumberID = 0;
-       }
-       public ActiveRule(Builder builder) {
-               super();
-               this.setBuilder(builder);
-               this.learningNode = newRuleActiveLearningNode(builder);
-               //JD - use builder ID to set ruleNumberID
-               this.ruleNumberID=builder.id;        
-       }
-
-       private RuleActiveRegressionNode newRuleActiveLearningNode(Builder 
builder) {
-               return new RuleActiveRegressionNode(builder);
-       }
-
-       /*
-        * Setters & getters
-        */
-       public Builder getBuilder() {
-               return builder;
-       }
-
-       public void setBuilder(Builder builder) {
-               this.builder = builder;
-       }
-       
-       @Override
-       public RuleRegressionNode getLearningNode() {
-               return this.learningNode;
-       }
-
-       @Override
-       public void setLearningNode(RuleRegressionNode learningNode) {
-               this.learningNode = (RuleActiveRegressionNode) learningNode;
-       }
-       
-       public double[] statisticsOtherBranchSplit() {
-               return this.statisticsOtherBranchSplit;
-       }
-       
-       public RuleSplitNode getLastUpdatedRuleSplitNode() {
-               return this.lastUpdatedRuleSplitNode;
-       }
-
-       /*
-        * Builder
-        */
-       public static class Builder implements Serializable {
-
-               private static final long serialVersionUID = 
1712887264918475622L;
-               protected boolean changeDetection;
-               protected boolean usePerceptron;
-               protected double threshold;
-               protected double alpha;
-               protected int predictionFunction;
-               protected boolean constantLearningRatioDecay;
-               protected double learningRatio;
-
-               protected double[] statistics;
-
-               protected FIMTDDNumericAttributeClassLimitObserver 
numericObserver;
-               
-               protected double lastTargetMean;
-
-               public int id;
-
-               public Builder() {
-               }
-
-               public Builder changeDetection(boolean changeDetection) {
-                       this.changeDetection = changeDetection;
-                       return this;
-               }
-
-               public Builder threshold(double threshold) {
-                       this.threshold = threshold;
-                       return this;
-               }
-
-               public Builder alpha(double alpha) {
-                       this.alpha = alpha;
-                       return this;
-               }
-
-               public Builder predictionFunction(int predictionFunction) {
-                       this.predictionFunction = predictionFunction;
-                       return this;
-               }
-
-               public Builder statistics(double[] statistics) {
-                       this.statistics = statistics;
-                       return this;
-               }
-               
-               public Builder constantLearningRatioDecay(boolean 
constantLearningRatioDecay) {
-                       this.constantLearningRatioDecay = 
constantLearningRatioDecay;
-                       return this;
-               }
-               
-               public Builder learningRatio(double learningRatio) {
-                       this.learningRatio = learningRatio;
-                       return this;
-               }
-               
-               public Builder 
numericObserver(FIMTDDNumericAttributeClassLimitObserver numericObserver) {
-                       this.numericObserver = numericObserver;
-                       return this;
-               }
-
-               public Builder id(int id) {
-                       this.id = id;
-                       return this;
-               }
-               public ActiveRule build() {
-                       return new ActiveRule(this);
-               }
-
-       }
-
-       /**
-        *  Try to Expand method.
-        * @param splitConfidence
-        * @param tieThreshold
-        * @return
-        */
-       public boolean tryToExpand(double splitConfidence, double tieThreshold) 
{
-
-               boolean shouldSplit= 
this.learningNode.tryToExpand(splitConfidence, tieThreshold);
-               return shouldSplit;
-
-       }
-       
-       //JD: Only call after tryToExpand returning true
-       public void split()
-       {
-               //this.statisticsOtherBranchSplit  = 
this.learningNode.getStatisticsOtherBranchSplit(); 
-               //create a split node,
-               int splitIndex = this.learningNode.getSplitIndex();
-               InstanceConditionalTest 
st=this.learningNode.getBestSuggestion().splitTest;
-               if(st instanceof NumericAttributeBinaryTest) {
-                       NumericAttributeBinaryTest splitTest = 
(NumericAttributeBinaryTest) st;
-                       NumericAttributeBinaryRulePredicate predicate = new 
NumericAttributeBinaryRulePredicate(
-                                       splitTest.getAttsTestDependsOn()[0], 
splitTest.getSplitValue(),
-                                       splitIndex + 1);
-                       lastUpdatedRuleSplitNode = new RuleSplitNode(predicate, 
this.learningNode.getStatisticsBranchSplit() );
-                       if (this.nodeListAdd(lastUpdatedRuleSplitNode)) {
-                               // create a new learning node
-                               RuleActiveRegressionNode newLearningNode = 
newRuleActiveLearningNode(this.getBuilder().statistics(this.learningNode.getStatisticsNewRuleActiveLearningNode()));
 
-                               newLearningNode.initialize(this.learningNode);
-                               this.learningNode = newLearningNode;
-                       }
-               }
-               else
-                       throw new UnsupportedOperationException("AMRules 
(currently) only supports numerical attributes.");
-       }
-
-       
-       
-//     protected void debug(String string, int level) {
-//     if (this.amRules.VerbosityOption.getValue()>=level) {
-//             System.out.println(string);
-//     }
-//}
-       
-       /**
-        * MOA GUI output
-        */
-       @Override
-       public void getDescription(StringBuilder sb, int indent) {
-       }
+
+  private static final long serialVersionUID = 1L;
+
+  private double[] statisticsOtherBranchSplit;
+
+  private Builder builder;
+
+  private RuleActiveRegressionNode learningNode;
+
+  private RuleSplitNode lastUpdatedRuleSplitNode;
+
+  /*
+   * Constructor with Builder
+   */
+  public ActiveRule() {
+    super();
+    this.builder = null;
+    this.learningNode = null;
+    this.ruleNumberID = 0;
+  }
+
+  public ActiveRule(Builder builder) {
+    super();
+    this.setBuilder(builder);
+    this.learningNode = newRuleActiveLearningNode(builder);
+    // JD - use builder ID to set ruleNumberID
+    this.ruleNumberID = builder.id;
+  }
+
+  private RuleActiveRegressionNode newRuleActiveLearningNode(Builder builder) {
+    return new RuleActiveRegressionNode(builder);
+  }
+
+  /*
+   * Setters & getters
+   */
+  public Builder getBuilder() {
+    return builder;
+  }
+
+  public void setBuilder(Builder builder) {
+    this.builder = builder;
+  }
+
+  @Override
+  public RuleRegressionNode getLearningNode() {
+    return this.learningNode;
+  }
+
+  @Override
+  public void setLearningNode(RuleRegressionNode learningNode) {
+    this.learningNode = (RuleActiveRegressionNode) learningNode;
+  }
+
+  public double[] statisticsOtherBranchSplit() {
+    return this.statisticsOtherBranchSplit;
+  }
+
+  public RuleSplitNode getLastUpdatedRuleSplitNode() {
+    return this.lastUpdatedRuleSplitNode;
+  }
+
+  /*
+   * Builder
+   */
+  public static class Builder implements Serializable {
+
+    private static final long serialVersionUID = 1712887264918475622L;
+    protected boolean changeDetection;
+    protected boolean usePerceptron;
+    protected double threshold;
+    protected double alpha;
+    protected int predictionFunction;
+    protected boolean constantLearningRatioDecay;
+    protected double learningRatio;
+
+    protected double[] statistics;
+
+    protected FIMTDDNumericAttributeClassLimitObserver numericObserver;
+
+    protected double lastTargetMean;
+
+    public int id;
+
+    public Builder() {
+    }
+
+    public Builder changeDetection(boolean changeDetection) {
+      this.changeDetection = changeDetection;
+      return this;
+    }
+
+    public Builder threshold(double threshold) {
+      this.threshold = threshold;
+      return this;
+    }
+
+    public Builder alpha(double alpha) {
+      this.alpha = alpha;
+      return this;
+    }
+
+    public Builder predictionFunction(int predictionFunction) {
+      this.predictionFunction = predictionFunction;
+      return this;
+    }
+
+    public Builder statistics(double[] statistics) {
+      this.statistics = statistics;
+      return this;
+    }
+
+    public Builder constantLearningRatioDecay(boolean 
constantLearningRatioDecay) {
+      this.constantLearningRatioDecay = constantLearningRatioDecay;
+      return this;
+    }
+
+    public Builder learningRatio(double learningRatio) {
+      this.learningRatio = learningRatio;
+      return this;
+    }
+
+    public Builder numericObserver(FIMTDDNumericAttributeClassLimitObserver 
numericObserver) {
+      this.numericObserver = numericObserver;
+      return this;
+    }
+
+    public Builder id(int id) {
+      this.id = id;
+      return this;
+    }
+
+    public ActiveRule build() {
+      return new ActiveRule(this);
+    }
+
+  }
+
+  /**
+   * Try to Expand method.
+   * 
+   * @param splitConfidence
+   * @param tieThreshold
+   * @return
+   */
+  public boolean tryToExpand(double splitConfidence, double tieThreshold) {
+
+    boolean shouldSplit = this.learningNode.tryToExpand(splitConfidence, 
tieThreshold);
+    return shouldSplit;
+
+  }
+
+  // JD: Only call after tryToExpand returning true
+  public void split()
+  {
+    // this.statisticsOtherBranchSplit =
+    // this.learningNode.getStatisticsOtherBranchSplit();
+    // create a split node,
+    int splitIndex = this.learningNode.getSplitIndex();
+    InstanceConditionalTest st = 
this.learningNode.getBestSuggestion().splitTest;
+    if (st instanceof NumericAttributeBinaryTest) {
+      NumericAttributeBinaryTest splitTest = (NumericAttributeBinaryTest) st;
+      NumericAttributeBinaryRulePredicate predicate = new 
NumericAttributeBinaryRulePredicate(
+          splitTest.getAttsTestDependsOn()[0], splitTest.getSplitValue(),
+          splitIndex + 1);
+      lastUpdatedRuleSplitNode = new RuleSplitNode(predicate, 
this.learningNode.getStatisticsBranchSplit());
+      if (this.nodeListAdd(lastUpdatedRuleSplitNode)) {
+        // create a new learning node
+        RuleActiveRegressionNode newLearningNode = 
newRuleActiveLearningNode(this.getBuilder().statistics(
+            this.learningNode.getStatisticsNewRuleActiveLearningNode()));
+        newLearningNode.initialize(this.learningNode);
+        this.learningNode = newLearningNode;
+      }
+    }
+    else
+      throw new UnsupportedOperationException("AMRules (currently) only 
supports numerical attributes.");
+  }
+
+  // protected void debug(String string, int level) {
+  // if (this.amRules.VerbosityOption.getValue()>=level) {
+  // System.out.println(string);
+  // }
+  // }
+
+  /**
+   * MOA GUI output
+   */
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+  }
 }

Reply via email to