http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java
index 45f5719..068957d 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java
@@ -29,264 +29,268 @@ import com.yahoo.labs.samoa.moa.core.DoubleVector;
  * The base class for LearningNode for regression rule.
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public abstract class RuleRegressionNode implements Serializable {
-       
-       private static final long serialVersionUID = 9129659494380381126L;
-       
-       protected int predictionFunction;
-       protected int ruleNumberID;
-       // The statistics for this node:
-       // Number of instances that have reached it
-       // Sum of y values
-       // Sum of squared y values
-       protected DoubleVector nodeStatistics;
-       
-       protected Perceptron perceptron;
-       protected TargetMean targetMean;
-       protected double learningRatio;
-       
-       /*
-        * Simple setters & getters
-        */
-       public Perceptron getPerceptron() {
-               return perceptron;
-       }
-
-       public void setPerceptron(Perceptron perceptron) {
-               this.perceptron = perceptron;
-       }
-
-       public TargetMean getTargetMean() {
-               return targetMean;
-       }
-
-       public void setTargetMean(TargetMean targetMean) {
-               this.targetMean = targetMean;
-       }
-
-       /*
-        * Create a new RuleRegressionNode
-        */
-       public RuleRegressionNode(double[] initialClassObservations) {
-               this.nodeStatistics = new 
DoubleVector(initialClassObservations);
-       }
-
-       public RuleRegressionNode() {
-               this(new double[0]);
-       }
-
-       /*
-        * Update statistics with input instance
-        */
-       public abstract void updateStatistics(Instance instance);
-
-       /*
-        * Predictions 
-        */
-       public double[] getPrediction(Instance instance) {
-               int predictionMode = 
this.getLearnerToUse(this.predictionFunction);
-               return getPrediction(instance, predictionMode);
-       }
-       
-       public double[] getSimplePrediction() {
-               if( this.targetMean!=null)
-                       return this.targetMean.getVotesForInstance();
-               else
-                       return new double[]{0};
-       }
-
-       public double[] getPrediction(Instance instance, int predictionMode) {
-               double[] ret;
-               if (predictionMode == 1)
-                       ret=this.perceptron.getVotesForInstance(instance);
-               else
-                       ret=this.targetMean.getVotesForInstance(instance);
-               return ret;
-       }
-       
-       public double getNormalizedPrediction(Instance instance) {
-               double res;
-               double [] aux;
-               switch (this.predictionFunction) {
-               //perceptron - 1
-               case 1:
-                       res=this.perceptron.normalizedPrediction(instance);
-                       break;
-                       //target mean - 2
-               case 2:
-                       aux=this.targetMean.getVotesForInstance();
-                       res=normalize(aux[0]);
-                       break;
-                       //adaptive      - 0
-               case 0:  
-                       int predictionMode = this.getLearnerToUse(0);
-                       if(predictionMode == 1)
-                       {
-                               
res=this.perceptron.normalizedPrediction(instance);
-                       }
-                       else{
-                               
aux=this.targetMean.getVotesForInstance(instance);
-                               res = normalize(aux[0]); 
-                       }
-                       break;
-               default: 
-                       throw new UnsupportedOperationException("Prediction 
mode not in range.");
-               }
-               return res;
-       }
-
-       /*
-        * Get learner mode
-        */
-       public int getLearnerToUse(int predMode) {
-               int predictionMode = predMode;
-               if (predictionMode == 0) {
-                       double perceptronError= 
this.perceptron.getCurrentError();
-                       double meanTargetError 
=this.targetMean.getCurrentError();
-                       if (perceptronError < meanTargetError)
-                               predictionMode = 1; //PERCEPTRON
-                       else
-                               predictionMode = 2; //TARGET MEAN
-               }
-               return predictionMode;
-       }
-
-       /*
-        * Error and change detection
-        */
-       public double computeError(Instance instance) {
-               double normalizedPrediction = 
getNormalizedPrediction(instance); 
-               double normalizedClassValue = normalize(instance.classValue());
-               return Math.abs(normalizedClassValue - normalizedPrediction);
-       }
-       
-       public double getCurrentError() {
-               double error;
-               if (this.perceptron!=null){
-                       if (targetMean==null)
-                               error=perceptron.getCurrentError();
-                       else{
-                               double errorP=perceptron.getCurrentError();
-                               double errorTM=targetMean.getCurrentError();
-                               error = (errorP<errorTM) ? errorP : errorTM;    
-                       }               
-               }
-               else
-                       error=Double.MAX_VALUE;
-               return error;
-       }
-       
-       /*
-        * no. of instances seen
-        */
-       public long getInstancesSeen() {
-               if (nodeStatistics != null) {
-                       return (long)this.nodeStatistics.getValue(0);
-               } else {
-                       return 0;
-               }
-       }
-
-       public DoubleVector getNodeStatistics(){
-               return this.nodeStatistics;
-       }
-       
-       /*
-        * Anomaly detection
-        */
-       public boolean isAnomaly(Instance instance,
-                       double uniVariateAnomalyProbabilityThreshold,
-                       double multiVariateAnomalyProbabilityThreshold,
-                       int numberOfInstanceesForAnomaly) {
-               //AMRUles is equipped with anomaly detection. If on, compute 
the anomaly value.
-               long perceptronIntancesSeen=this.perceptron.getInstancesSeen();
-               if ( perceptronIntancesSeen>= numberOfInstanceesForAnomaly) {
-                       double attribSum;
-                       double attribSquaredSum;
-                       double D = 0.0;
-                       double N = 0.0;
-                       double anomaly;
-
-                       for (int x = 0; x < instance.numAttributes() - 1; x++) {
-                               // Perceptron is initialized each rule.
-                               // this is a local anomaly.
-                               int instAttIndex = 
modelAttIndexToInstanceAttIndex(x, instance);
-                               attribSum = 
this.perceptron.perceptronattributeStatistics.getValue(x);
-                               attribSquaredSum = 
this.perceptron.squaredperceptronattributeStatistics.getValue(x);
-                               double mean = attribSum / 
perceptronIntancesSeen;
-                               double sd = computeSD(attribSquaredSum, 
attribSum, perceptronIntancesSeen);
-                               double probability = computeProbability(mean, 
sd, instance.value(instAttIndex));
-
-                               if (probability > 0.0) {
-                                       D = D + Math.abs(Math.log(probability));
-                                       if (probability < 
uniVariateAnomalyProbabilityThreshold) {//0.10
-                                               N = N + 
Math.abs(Math.log(probability));
-                                       }
-                               } 
-                       }
-
-                       anomaly = 0.0;
-                       if (D != 0.0) {
-                               anomaly = N / D;
-                       }
-                       if (anomaly >= multiVariateAnomalyProbabilityThreshold) 
{
-                               //debuganomaly(instance,
-                               //              
uniVariateAnomalyProbabilityThreshold,
-                               //              
multiVariateAnomalyProbabilityThreshold,
-                               //              anomaly);
-                               return true;
-                       }
-               }
-               return false;
-       } 
-
-       /*
-        * Helpers
-        */
-       public static double computeProbability(double mean, double sd, double 
value) {
-               double probability = 0.0;
-
-               if (sd > 0.0) {
-                       double k = (Math.abs(value - mean) / sd); // One tailed 
variant of Chebyshev's inequality
-                       probability= 1.0 / (1+k*k);
-               }
-
-               return probability;
-       }
-
-       public static double computeHoeffdingBound(double range, double 
confidence, double n) {
-               return Math.sqrt(((range * range) * Math.log(1.0 / confidence)) 
/ (2.0 * n));
-       }
-
-       private double normalize(double value) {
-               double meanY = 
this.nodeStatistics.getValue(1)/this.nodeStatistics.getValue(0);
-               double sdY = computeSD(this.nodeStatistics.getValue(2), 
this.nodeStatistics.getValue(1), (long)this.nodeStatistics.getValue(0));
-               double normalizedY = 0.0;
-               if (sdY > 0.0000001) {
-                       normalizedY = (value - meanY) / (sdY);
-               }
-               return normalizedY;
-       }
-
-
-       public double computeSD(double squaredVal, double val, long size) {
-               if (size > 1) {
-                       return Math.sqrt((squaredVal - ((val * val) / size)) / 
(size - 1.0));
-               }
-               return 0.0;
-       }
-       
-       /**
-     * Gets the index of the attribute in the instance,
-     * given the index of the attribute in the learner.
-     *
-     * @param index the index of the attribute in the learner
-     * @param inst the instance
-     * @return the index in the instance
-     */
-    protected static int modelAttIndexToInstanceAttIndex(int index, Instance 
inst) {
-            return  index<= inst.classIndex() ? index : index + 1;
+
+  private static final long serialVersionUID = 9129659494380381126L;
+
+  protected int predictionFunction;
+  protected int ruleNumberID;
+  // The statistics for this node:
+  // Number of instances that have reached it
+  // Sum of y values
+  // Sum of squared y values
+  protected DoubleVector nodeStatistics;
+
+  protected Perceptron perceptron;
+  protected TargetMean targetMean;
+  protected double learningRatio;
+
+  /*
+   * Simple setters & getters
+   */
+  public Perceptron getPerceptron() {
+    return perceptron;
+  }
+
+  public void setPerceptron(Perceptron perceptron) {
+    this.perceptron = perceptron;
+  }
+
+  public TargetMean getTargetMean() {
+    return targetMean;
+  }
+
+  public void setTargetMean(TargetMean targetMean) {
+    this.targetMean = targetMean;
+  }
+
+  /*
+   * Create a new RuleRegressionNode
+   */
+  public RuleRegressionNode(double[] initialClassObservations) {
+    this.nodeStatistics = new DoubleVector(initialClassObservations);
+  }
+
+  public RuleRegressionNode() {
+    this(new double[0]);
+  }
+
+  /*
+   * Update statistics with input instance
+   */
+  public abstract void updateStatistics(Instance instance);
+
+  /*
+   * Predictions
+   */
+  public double[] getPrediction(Instance instance) {
+    int predictionMode = this.getLearnerToUse(this.predictionFunction);
+    return getPrediction(instance, predictionMode);
+  }
+
+  public double[] getSimplePrediction() {
+    if (this.targetMean != null)
+      return this.targetMean.getVotesForInstance();
+    else
+      return new double[] { 0 };
+  }
+
+  public double[] getPrediction(Instance instance, int predictionMode) {
+    double[] ret;
+    if (predictionMode == 1)
+      ret = this.perceptron.getVotesForInstance(instance);
+    else
+      ret = this.targetMean.getVotesForInstance(instance);
+    return ret;
+  }
+
+  public double getNormalizedPrediction(Instance instance) {
+    double res;
+    double[] aux;
+    switch (this.predictionFunction) {
+    // perceptron - 1
+    case 1:
+      res = this.perceptron.normalizedPrediction(instance);
+      break;
+    // target mean - 2
+    case 2:
+      aux = this.targetMean.getVotesForInstance();
+      res = normalize(aux[0]);
+      break;
+    // adaptive - 0
+    case 0:
+      int predictionMode = this.getLearnerToUse(0);
+      if (predictionMode == 1)
+      {
+        res = this.perceptron.normalizedPrediction(instance);
+      }
+      else {
+        aux = this.targetMean.getVotesForInstance(instance);
+        res = normalize(aux[0]);
+      }
+      break;
+    default:
+      throw new UnsupportedOperationException("Prediction mode not in range.");
+    }
+    return res;
+  }
+
+  /*
+   * Get learner mode
+   */
+  public int getLearnerToUse(int predMode) {
+    int predictionMode = predMode;
+    if (predictionMode == 0) {
+      double perceptronError = this.perceptron.getCurrentError();
+      double meanTargetError = this.targetMean.getCurrentError();
+      if (perceptronError < meanTargetError)
+        predictionMode = 1; // PERCEPTRON
+      else
+        predictionMode = 2; // TARGET MEAN
+    }
+    return predictionMode;
+  }
+
+  /*
+   * Error and change detection
+   */
+  public double computeError(Instance instance) {
+    double normalizedPrediction = getNormalizedPrediction(instance);
+    double normalizedClassValue = normalize(instance.classValue());
+    return Math.abs(normalizedClassValue - normalizedPrediction);
+  }
+
+  public double getCurrentError() {
+    double error;
+    if (this.perceptron != null) {
+      if (targetMean == null)
+        error = perceptron.getCurrentError();
+      else {
+        double errorP = perceptron.getCurrentError();
+        double errorTM = targetMean.getCurrentError();
+        error = (errorP < errorTM) ? errorP : errorTM;
+      }
+    }
+    else
+      error = Double.MAX_VALUE;
+    return error;
+  }
+
+  /*
+   * no. of instances seen
+   */
+  public long getInstancesSeen() {
+    if (nodeStatistics != null) {
+      return (long) this.nodeStatistics.getValue(0);
+    } else {
+      return 0;
+    }
+  }
+
+  public DoubleVector getNodeStatistics() {
+    return this.nodeStatistics;
+  }
+
+  /*
+   * Anomaly detection
+   */
+  public boolean isAnomaly(Instance instance,
+      double uniVariateAnomalyProbabilityThreshold,
+      double multiVariateAnomalyProbabilityThreshold,
+      int numberOfInstanceesForAnomaly) {
+    // AMRUles is equipped with anomaly detection. If on, compute the anomaly
+    // value.
+    long perceptronIntancesSeen = this.perceptron.getInstancesSeen();
+    if (perceptronIntancesSeen >= numberOfInstanceesForAnomaly) {
+      double attribSum;
+      double attribSquaredSum;
+      double D = 0.0;
+      double N = 0.0;
+      double anomaly;
+
+      for (int x = 0; x < instance.numAttributes() - 1; x++) {
+        // Perceptron is initialized each rule.
+        // this is a local anomaly.
+        int instAttIndex = modelAttIndexToInstanceAttIndex(x, instance);
+        attribSum = this.perceptron.perceptronattributeStatistics.getValue(x);
+        attribSquaredSum = 
this.perceptron.squaredperceptronattributeStatistics.getValue(x);
+        double mean = attribSum / perceptronIntancesSeen;
+        double sd = computeSD(attribSquaredSum, attribSum, 
perceptronIntancesSeen);
+        double probability = computeProbability(mean, sd, 
instance.value(instAttIndex));
+
+        if (probability > 0.0) {
+          D = D + Math.abs(Math.log(probability));
+          if (probability < uniVariateAnomalyProbabilityThreshold) {// 0.10
+            N = N + Math.abs(Math.log(probability));
+          }
+        }
+      }
+
+      anomaly = 0.0;
+      if (D != 0.0) {
+        anomaly = N / D;
+      }
+      if (anomaly >= multiVariateAnomalyProbabilityThreshold) {
+        // debuganomaly(instance,
+        // uniVariateAnomalyProbabilityThreshold,
+        // multiVariateAnomalyProbabilityThreshold,
+        // anomaly);
+        return true;
+      }
     }
+    return false;
+  }
+
+  /*
+   * Helpers
+   */
+  public static double computeProbability(double mean, double sd, double 
value) {
+    double probability = 0.0;
+
+    if (sd > 0.0) {
+      double k = (Math.abs(value - mean) / sd); // One tailed variant of
+                                                // Chebyshev's inequality
+      probability = 1.0 / (1 + k * k);
+    }
+
+    return probability;
+  }
+
+  public static double computeHoeffdingBound(double range, double confidence, 
double n) {
+    return Math.sqrt(((range * range) * Math.log(1.0 / confidence)) / (2.0 * 
n));
+  }
+
+  private double normalize(double value) {
+    double meanY = this.nodeStatistics.getValue(1) / 
this.nodeStatistics.getValue(0);
+    double sdY = computeSD(this.nodeStatistics.getValue(2), 
this.nodeStatistics.getValue(1),
+        (long) this.nodeStatistics.getValue(0));
+    double normalizedY = 0.0;
+    if (sdY > 0.0000001) {
+      normalizedY = (value - meanY) / (sdY);
+    }
+    return normalizedY;
+  }
+
+  public double computeSD(double squaredVal, double val, long size) {
+    if (size > 1) {
+      return Math.sqrt((squaredVal - ((val * val) / size)) / (size - 1.0));
+    }
+    return 0.0;
+  }
+
+  /**
+   * Gets the index of the attribute in the instance, given the index of the
+   * attribute in the learner.
+   * 
+   * @param index
+   *          the index of the attribute in the learner
+   * @param inst
+   *          the instance
+   * @return the index in the instance
+   */
+  protected static int modelAttIndexToInstanceAttIndex(int index, Instance 
inst) {
+    return index <= inst.classIndex() ? index : index + 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java
index 28f4890..a89345e 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java
@@ -30,37 +30,39 @@ import com.yahoo.labs.samoa.instances.Instance;
  * Represent a feature of rules (an element of ruleś nodeList).
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class RuleSplitNode extends SplitNode {
 
-    protected double lastTargetMean;
-    protected int operatorObserver;
+  protected double lastTargetMean;
+  protected int operatorObserver;
+
+  private static final long serialVersionUID = 1L;
+
+  public InstanceConditionalTest getSplitTest() {
+    return this.splitTest;
+  }
 
-    private static final long serialVersionUID = 1L;
+  /**
+   * Create a new RuleSplitNode
+   */
+  public RuleSplitNode() {
+    this(null, new double[0]);
+  }
 
-    public InstanceConditionalTest getSplitTest() {
-        return this.splitTest;
-    }
+  public RuleSplitNode(InstanceConditionalTest splitTest, double[] 
classObservations) {
+    super(splitTest, classObservations);
+  }
 
-    /**
-     * Create a new RuleSplitNode
-     */
-    public RuleSplitNode() {
-       this(null, new double[0]);
-    }
-    public RuleSplitNode(InstanceConditionalTest splitTest, double[] 
classObservations) {
-        super(splitTest, classObservations);
-    }
-    
-    public RuleSplitNode getACopy() {
-       InstanceConditionalTest splitTest = new 
NumericAttributeBinaryRulePredicate((NumericAttributeBinaryRulePredicate) 
this.getSplitTest());
-               return new RuleSplitNode(splitTest, 
this.getObservedClassDistribution());
-    }
+  public RuleSplitNode getACopy() {
+    InstanceConditionalTest splitTest = new 
NumericAttributeBinaryRulePredicate(
+        (NumericAttributeBinaryRulePredicate) this.getSplitTest());
+    return new RuleSplitNode(splitTest, this.getObservedClassDistribution());
+  }
 
-    public boolean evaluate(Instance instance) {
-        Predicate predicate = (Predicate) this.splitTest;
-        return predicate.evaluate(instance);
-    }
+  public boolean evaluate(Instance instance) {
+    Predicate predicate = (Predicate) this.splitTest;
+    return predicate.evaluate(instance);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java
index 902acf0..da00c0d 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java
@@ -59,162 +59,164 @@ import com.yahoo.labs.samoa.moa.core.StringUtils;
 
 public class TargetMean extends AbstractClassifier implements Regressor {
 
-       /**
+  /**
         * 
         */
-       protected long n;
-       protected double sum;
-       protected double errorSum;
-       protected double nError;
-       private double fadingErrorFactor;
-
-       private static final long serialVersionUID = 7152547322803559115L;
-
-       public FloatOption fadingErrorFactorOption = new FloatOption(
-                       "fadingErrorFactor", 'e', 
-                       "Fading error factor for the TargetMean accumulated 
error", 0.99, 0, 1);
-
-       @Override
-       public boolean isRandomizable() {
-               return false;
-       }
-
-       @Override
-       public double[] getVotesForInstance(Instance inst) {
-               return getVotesForInstance();
-       }
-       
-       public double[] getVotesForInstance() {
-               double[] currentMean=new double[1];
-               if (n>0)
-                       currentMean[0]=sum/n;
-               else
-                       currentMean[0]=0;
-               return currentMean;
-       }
-
-       @Override
-       public void resetLearningImpl() {
-               sum=0;
-               n=0;
-               errorSum=Double.MAX_VALUE;
-               nError=0;
-       }
-
-       @Override
-       public void trainOnInstanceImpl(Instance inst) {
-               updateAccumulatedError(inst);
-               ++this.n;
-               this.sum+=inst.classValue();
-       }
-       protected void updateAccumulatedError(Instance inst){
-               double mean=0;
-               nError=1+fadingErrorFactor*nError;
-               if(n>0)
-                       mean=sum/n;
-               
errorSum=Math.abs(inst.classValue()-mean)+fadingErrorFactor*errorSum;
-       }
-
-       @Override
-       protected Measurement[] getModelMeasurementsImpl() {
-               return null;
-       }
-
-       @Override
-       public void getModelDescription(StringBuilder out, int indent) {
-               StringUtils.appendIndented(out, indent, "Current Mean: " + 
this.sum/this.n);
-               StringUtils.appendNewline(out); 
-
-       }
-       /* JD
-        * Resets the learner but initializes with a starting point 
-        * */
-        public void reset(double currentMean, long numberOfInstances) {
-               this.sum=currentMean*numberOfInstances;
-               this.n=numberOfInstances;
-               this.resetError();
-       }
-
-       /* JD
-        * Resets the learner but initializes with a starting point 
-        * */
-        public double getCurrentError(){
-               if(this.nError>0)
-                       return this.errorSum/this.nError;
-               else
-                       return Double.MAX_VALUE;
-        }
-
-        public TargetMean(TargetMean t) {
-                super();
-                this.n = t.n;
-                this.sum = t.sum;
-                this.errorSum = t.errorSum;
-                this.nError = t.nError;
-                this.fadingErrorFactor = t.fadingErrorFactor;
-                this.fadingErrorFactorOption = t.fadingErrorFactorOption;
-        }
-        
-        public TargetMean(TargetMeanData td) {
-                this();
-                this.n = td.n;
-                this.sum = td.sum;
-                this.errorSum = td.errorSum;
-                this.nError = td.nError;
-                this.fadingErrorFactor = td.fadingErrorFactor;
-                
this.fadingErrorFactorOption.setValue(td.fadingErrorFactorOptionValue);
-        }
-
-        public TargetMean() {
-                super();
-                fadingErrorFactor=fadingErrorFactorOption.getValue();
-        }
-
-        public void resetError() {
-                this.errorSum=0;
-                this.nError=0;
-        }
-        
-        public static class TargetMeanData {
-                private long n;
-                private double sum;
-                private double errorSum;
-                private double nError;
-                private double fadingErrorFactor;
-                private double fadingErrorFactorOptionValue;
-                
-                public TargetMeanData() {
-                        
-                }
-                
-                public TargetMeanData(TargetMean tm) {
-                        this.n = tm.n;
-                        this.sum = tm.sum;
-                        this.errorSum = tm.errorSum;
-                        this.nError = tm.nError;
-                        this.fadingErrorFactor = tm.fadingErrorFactor;
-                        if (tm.fadingErrorFactorOption != null)
-                                this.fadingErrorFactorOptionValue = 
tm.fadingErrorFactorOption.getValue();
-                        else
-                                this.fadingErrorFactorOptionValue = 0.99;
-                }
-                
-                public TargetMean build() {
-                        return new TargetMean(this);
-                }
-        }
-        
-        public static final class TargetMeanSerializer extends 
Serializer<TargetMean>{
-
-                       @Override
-                       public void write(Kryo kryo, Output output, TargetMean 
t) {
-                               kryo.writeObjectOrNull(output, new 
TargetMeanData(t), TargetMeanData.class);
-                       }
-
-                       @Override
-                       public TargetMean read(Kryo kryo, Input input, 
Class<TargetMean> type) {
-                               TargetMeanData data = 
kryo.readObjectOrNull(input, TargetMeanData.class);
-                               return data.build();
-                       }
-               }
+  protected long n;
+  protected double sum;
+  protected double errorSum;
+  protected double nError;
+  private double fadingErrorFactor;
+
+  private static final long serialVersionUID = 7152547322803559115L;
+
+  public FloatOption fadingErrorFactorOption = new FloatOption(
+      "fadingErrorFactor", 'e',
+      "Fading error factor for the TargetMean accumulated error", 0.99, 0, 1);
+
+  @Override
+  public boolean isRandomizable() {
+    return false;
+  }
+
+  @Override
+  public double[] getVotesForInstance(Instance inst) {
+    return getVotesForInstance();
+  }
+
+  public double[] getVotesForInstance() {
+    double[] currentMean = new double[1];
+    if (n > 0)
+      currentMean[0] = sum / n;
+    else
+      currentMean[0] = 0;
+    return currentMean;
+  }
+
+  @Override
+  public void resetLearningImpl() {
+    sum = 0;
+    n = 0;
+    errorSum = Double.MAX_VALUE;
+    nError = 0;
+  }
+
+  @Override
+  public void trainOnInstanceImpl(Instance inst) {
+    updateAccumulatedError(inst);
+    ++this.n;
+    this.sum += inst.classValue();
+  }
+
+  protected void updateAccumulatedError(Instance inst) {
+    double mean = 0;
+    nError = 1 + fadingErrorFactor * nError;
+    if (n > 0)
+      mean = sum / n;
+    errorSum = Math.abs(inst.classValue() - mean) + fadingErrorFactor * 
errorSum;
+  }
+
+  @Override
+  protected Measurement[] getModelMeasurementsImpl() {
+    return null;
+  }
+
+  @Override
+  public void getModelDescription(StringBuilder out, int indent) {
+    StringUtils.appendIndented(out, indent, "Current Mean: " + this.sum / 
this.n);
+    StringUtils.appendNewline(out);
+
+  }
+
+  /*
+   * JD Resets the learner but initializes with a starting point
+   */
+  public void reset(double currentMean, long numberOfInstances) {
+    this.sum = currentMean * numberOfInstances;
+    this.n = numberOfInstances;
+    this.resetError();
+  }
+
+  /*
+   * JD Resets the learner but initializes with a starting point
+   */
+  public double getCurrentError() {
+    if (this.nError > 0)
+      return this.errorSum / this.nError;
+    else
+      return Double.MAX_VALUE;
+  }
+
+  public TargetMean(TargetMean t) {
+    super();
+    this.n = t.n;
+    this.sum = t.sum;
+    this.errorSum = t.errorSum;
+    this.nError = t.nError;
+    this.fadingErrorFactor = t.fadingErrorFactor;
+    this.fadingErrorFactorOption = t.fadingErrorFactorOption;
+  }
+
+  public TargetMean(TargetMeanData td) {
+    this();
+    this.n = td.n;
+    this.sum = td.sum;
+    this.errorSum = td.errorSum;
+    this.nError = td.nError;
+    this.fadingErrorFactor = td.fadingErrorFactor;
+    this.fadingErrorFactorOption.setValue(td.fadingErrorFactorOptionValue);
+  }
+
+  public TargetMean() {
+    super();
+    fadingErrorFactor = fadingErrorFactorOption.getValue();
+  }
+
+  public void resetError() {
+    this.errorSum = 0;
+    this.nError = 0;
+  }
+
+  public static class TargetMeanData {
+    private long n;
+    private double sum;
+    private double errorSum;
+    private double nError;
+    private double fadingErrorFactor;
+    private double fadingErrorFactorOptionValue;
+
+    public TargetMeanData() {
+
+    }
+
+    public TargetMeanData(TargetMean tm) {
+      this.n = tm.n;
+      this.sum = tm.sum;
+      this.errorSum = tm.errorSum;
+      this.nError = tm.nError;
+      this.fadingErrorFactor = tm.fadingErrorFactor;
+      if (tm.fadingErrorFactorOption != null)
+        this.fadingErrorFactorOptionValue = 
tm.fadingErrorFactorOption.getValue();
+      else
+        this.fadingErrorFactorOptionValue = 0.99;
+    }
+
+    public TargetMean build() {
+      return new TargetMean(this);
+    }
+  }
+
+  public static final class TargetMeanSerializer extends 
Serializer<TargetMean> {
+
+    @Override
+    public void write(Kryo kryo, Output output, TargetMean t) {
+      kryo.writeObjectOrNull(output, new TargetMeanData(t), 
TargetMeanData.class);
+    }
+
+    @Override
+    public TargetMean read(Kryo kryo, Input input, Class<TargetMean> type) {
+      TargetMeanData data = kryo.readObjectOrNull(input, TargetMeanData.class);
+      return data.build();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
index 54a4006..007c535 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
@@ -39,296 +39,300 @@ import com.yahoo.labs.samoa.topology.Stream;
  * Default Rule Learner Processor (HAMR).
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class AMRDefaultRuleProcessor implements Processor {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 23702084591044447L;
-       
-       private static final Logger logger = 
-                       LoggerFactory.getLogger(AMRDefaultRuleProcessor.class);
-
-       private int processorId;
-
-       // Default rule
-       protected transient ActiveRule defaultRule;
-       protected transient int ruleNumberID;
-       protected transient double[] statistics;
-
-       // SAMOA Stream
-       private Stream ruleStream;
-       private Stream resultStream;
-
-       // Options
-       protected int pageHinckleyThreshold;
-       protected double pageHinckleyAlpha;
-       protected boolean driftDetection;
-       protected int predictionFunction; // Adaptive=0 Perceptron=1 
TargetMean=2
-       protected boolean constantLearningRatioDecay;
-       protected double learningRatio;
-
-       protected double splitConfidence;
-       protected double tieThreshold;
-       protected int gracePeriod;
-
-       protected FIMTDDNumericAttributeClassLimitObserver numericObserver;
-       
-       /*
-        * Constructor
-        */
-       public AMRDefaultRuleProcessor (Builder builder) {
-               this.pageHinckleyThreshold = builder.pageHinckleyThreshold;
-               this.pageHinckleyAlpha = builder.pageHinckleyAlpha;
-               this.driftDetection = builder.driftDetection;
-               this.predictionFunction = builder.predictionFunction;
-               this.constantLearningRatioDecay = 
builder.constantLearningRatioDecay;
-               this.learningRatio = builder.learningRatio;
-               this.splitConfidence = builder.splitConfidence;
-               this.tieThreshold = builder.tieThreshold;
-               this.gracePeriod = builder.gracePeriod;
-               
-               this.numericObserver = builder.numericObserver;
-       }
-       
-       @Override
-       public boolean process(ContentEvent event) {
-               InstanceContentEvent instanceEvent = (InstanceContentEvent) 
event;
-               // predict
-               if (instanceEvent.isTesting()) {
-                       this.predictOnInstance(instanceEvent);
-               }
-
-               // train
-               if (instanceEvent.isTraining()) {
-                       this.trainOnInstance(instanceEvent);
-               }
-               
-               return false;
-       }
-       
-       /*
-        * Prediction
-        */
-       private void predictOnInstance (InstanceContentEvent instanceEvent) {
-               double [] 
vote=defaultRule.getPrediction(instanceEvent.getInstance());
-               ResultContentEvent rce = newResultContentEvent(vote, 
instanceEvent);
-               resultStream.put(rce);
-       }
-
-       private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent){
-               ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), 
inEvent.getClassId(), prediction, inEvent.isLastEvent());
-               rce.setClassifierIndex(this.processorId);
-               rce.setEvaluationIndex(inEvent.getEvaluationIndex());
-               return rce;
-       }
-       
-       /*
-        * Training
-        */
-       private void trainOnInstance (InstanceContentEvent instanceEvent) {
-               this.trainOnInstanceImpl(instanceEvent.getInstance());
-       }
-       public void trainOnInstanceImpl(Instance instance) {
-               defaultRule.updateStatistics(instance);
-               if (defaultRule.getInstancesSeen() % this.gracePeriod == 0.0) {
-                       if (defaultRule.tryToExpand(this.splitConfidence, 
this.tieThreshold) == true) {
-                               ActiveRule 
newDefaultRule=newRule(defaultRule.getRuleNumberID(),(RuleActiveRegressionNode)defaultRule.getLearningNode(),
-                                               
((RuleActiveRegressionNode)defaultRule.getLearningNode()).getStatisticsOtherBranchSplit());
 //other branch
-                               defaultRule.split();
-                               defaultRule.setRuleNumberID(++ruleNumberID);
-                               // send out the new rule
-                               sendAddRuleEvent(defaultRule.getRuleNumberID(), 
this.defaultRule);
-                               defaultRule=newDefaultRule;
-                       }
-               }
-       }
-       
-       /*
-        * Create new rules
-        */
-       private ActiveRule newRule(int ID, RuleActiveRegressionNode node, 
double[] statistics) {
-               ActiveRule r=newRule(ID);
-
-               if (node!=null)
-               {
-                       if(node.getPerceptron()!=null)
-                       {
-                               r.getLearningNode().setPerceptron(new 
Perceptron(node.getPerceptron()));
-                               
r.getLearningNode().getPerceptron().setLearningRatio(this.learningRatio);
-                       }
-                       if (statistics==null)
-                       {
-                               double mean;
-                               if(node.getNodeStatistics().getValue(0)>0){
-                                       
mean=node.getNodeStatistics().getValue(1)/node.getNodeStatistics().getValue(0);
-                                       
r.getLearningNode().getTargetMean().reset(mean, 1); 
-                               }
-                       }  
-               }
-               if (statistics!=null && 
((RuleActiveRegressionNode)r.getLearningNode()).getTargetMean()!=null)
-               {
-                       double mean;
-                       if(statistics[0]>0){
-                               mean=statistics[1]/statistics[0];
-                               
((RuleActiveRegressionNode)r.getLearningNode()).getTargetMean().reset(mean, 
(long)statistics[0]); 
-                       }
-               }
-               return r;
-       }
-
-       private ActiveRule newRule(int ID) {
-               ActiveRule r=new ActiveRule.Builder().
-                               threshold(this.pageHinckleyThreshold).
-                               alpha(this.pageHinckleyAlpha).
-                               changeDetection(this.driftDetection).
-                               predictionFunction(this.predictionFunction).
-                               statistics(new double[3]).
-                               learningRatio(this.learningRatio).
-                               numericObserver(numericObserver).
-                               id(ID).build();
-               return r;
-       }
-       
-       @Override
-       public void onCreate(int id) {
-               this.processorId = id;
-               this.statistics= new double[]{0.0,0,0}; 
-               this.ruleNumberID=0;
-               this.defaultRule = newRule(++this.ruleNumberID);
-       }
-
-       /* 
-        * Clone processor
-        */
-       @Override
-       public Processor newProcessor(Processor p) {
-               AMRDefaultRuleProcessor oldProcessor = 
(AMRDefaultRuleProcessor) p;
-               Builder builder = new Builder(oldProcessor);
-               AMRDefaultRuleProcessor newProcessor = builder.build();
-               newProcessor.resultStream = oldProcessor.resultStream;
-               newProcessor.ruleStream = oldProcessor.ruleStream;
-               return newProcessor;
-       }
-       
-       /*
-        * Send events
-        */
-       private void sendAddRuleEvent(int ruleID, ActiveRule rule) {
-               RuleContentEvent rce = new RuleContentEvent(ruleID, rule, 
false);
-               this.ruleStream.put(rce);
-       }
-       
-       /*
-        * Output streams
-        */
-       public void setRuleStream(Stream ruleStream) {
-               this.ruleStream = ruleStream;
-       }
-       
-       public Stream getRuleStream() {
-               return this.ruleStream;
-       }
-       
-       public void setResultStream(Stream resultStream) {
-               this.resultStream = resultStream;
-       }
-       
-       public Stream getResultStream() {
-               return this.resultStream;
-       }
-       
-       /*
-        * Builder
-        */
-       public static class Builder {
-               private int pageHinckleyThreshold;
-               private double pageHinckleyAlpha;
-               private boolean driftDetection;
-               private int predictionFunction; // Adaptive=0 Perceptron=1 
TargetMean=2
-               private boolean constantLearningRatioDecay;
-               private double learningRatio;
-               private double splitConfidence;
-               private double tieThreshold;
-               private int gracePeriod;
-               
-               private FIMTDDNumericAttributeClassLimitObserver 
numericObserver;
-               
-               private Instances dataset;
-               
-               public Builder(Instances dataset){
-                       this.dataset = dataset;
-               }
-               
-               public Builder(AMRDefaultRuleProcessor processor) {
-                       this.pageHinckleyThreshold = 
processor.pageHinckleyThreshold;
-                       this.pageHinckleyAlpha = processor.pageHinckleyAlpha;
-                       this.driftDetection = processor.driftDetection;
-                       this.predictionFunction = processor.predictionFunction;
-                       this.constantLearningRatioDecay = 
processor.constantLearningRatioDecay;
-                       this.learningRatio = processor.learningRatio;
-                       this.splitConfidence = processor.splitConfidence;
-                       this.tieThreshold = processor.tieThreshold;
-                       this.gracePeriod = processor.gracePeriod;
-                       
-                       this.numericObserver = processor.numericObserver;
-               }
-               
-               public Builder threshold(int threshold) {
-                       this.pageHinckleyThreshold = threshold;
-                       return this;
-               }
-               
-               public Builder alpha(double alpha) {
-                       this.pageHinckleyAlpha = alpha;
-                       return this;
-               }
-               
-               public Builder changeDetection(boolean changeDetection) {
-                       this.driftDetection = changeDetection;
-                       return this;
-               }
-               
-               public Builder predictionFunction(int predictionFunction) {
-                       this.predictionFunction = predictionFunction;
-                       return this;
-               }
-               
-               public Builder constantLearningRatioDecay(boolean 
constantDecay) {
-                       this.constantLearningRatioDecay = constantDecay;
-                       return this;
-               }
-               
-               public Builder learningRatio(double learningRatio) {
-                       this.learningRatio = learningRatio;
-                       return this;
-               }
-               
-               public Builder splitConfidence(double splitConfidence) {
-                       this.splitConfidence = splitConfidence;
-                       return this;
-               }
-               
-               public Builder tieThreshold(double tieThreshold) {
-                       this.tieThreshold = tieThreshold;
-                       return this;
-               }
-               
-               public Builder gracePeriod(int gracePeriod) {
-                       this.gracePeriod = gracePeriod;
-                       return this;
-               }
-               
-               public Builder 
numericObserver(FIMTDDNumericAttributeClassLimitObserver numericObserver) {
-                       this.numericObserver = numericObserver;
-                       return this;
-               }
-               
-               public AMRDefaultRuleProcessor build() {
-                       return new AMRDefaultRuleProcessor(this);
-               }
-       }
-       
+  private static final long serialVersionUID = 23702084591044447L;
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AMRDefaultRuleProcessor.class);
+
+  private int processorId;
+
+  // Default rule
+  protected transient ActiveRule defaultRule;
+  protected transient int ruleNumberID;
+  protected transient double[] statistics;
+
+  // SAMOA Stream
+  private Stream ruleStream;
+  private Stream resultStream;
+
+  // Options
+  protected int pageHinckleyThreshold;
+  protected double pageHinckleyAlpha;
+  protected boolean driftDetection;
+  protected int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2
+  protected boolean constantLearningRatioDecay;
+  protected double learningRatio;
+
+  protected double splitConfidence;
+  protected double tieThreshold;
+  protected int gracePeriod;
+
+  protected FIMTDDNumericAttributeClassLimitObserver numericObserver;
+
+  /*
+   * Constructor
+   */
+  public AMRDefaultRuleProcessor(Builder builder) {
+    this.pageHinckleyThreshold = builder.pageHinckleyThreshold;
+    this.pageHinckleyAlpha = builder.pageHinckleyAlpha;
+    this.driftDetection = builder.driftDetection;
+    this.predictionFunction = builder.predictionFunction;
+    this.constantLearningRatioDecay = builder.constantLearningRatioDecay;
+    this.learningRatio = builder.learningRatio;
+    this.splitConfidence = builder.splitConfidence;
+    this.tieThreshold = builder.tieThreshold;
+    this.gracePeriod = builder.gracePeriod;
+
+    this.numericObserver = builder.numericObserver;
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+    InstanceContentEvent instanceEvent = (InstanceContentEvent) event;
+    // predict
+    if (instanceEvent.isTesting()) {
+      this.predictOnInstance(instanceEvent);
+    }
+
+    // train
+    if (instanceEvent.isTraining()) {
+      this.trainOnInstance(instanceEvent);
+    }
+
+    return false;
+  }
+
+  /*
+   * Prediction
+   */
+  private void predictOnInstance(InstanceContentEvent instanceEvent) {
+    double[] vote = defaultRule.getPrediction(instanceEvent.getInstance());
+    ResultContentEvent rce = newResultContentEvent(vote, instanceEvent);
+    resultStream.put(rce);
+  }
+
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  /*
+   * Training
+   */
+  private void trainOnInstance(InstanceContentEvent instanceEvent) {
+    this.trainOnInstanceImpl(instanceEvent.getInstance());
+  }
+
+  public void trainOnInstanceImpl(Instance instance) {
+    defaultRule.updateStatistics(instance);
+    if (defaultRule.getInstancesSeen() % this.gracePeriod == 0.0) {
+      if (defaultRule.tryToExpand(this.splitConfidence, this.tieThreshold) == 
true) {
+        ActiveRule newDefaultRule = newRule(defaultRule.getRuleNumberID(),
+            (RuleActiveRegressionNode) defaultRule.getLearningNode(),
+            ((RuleActiveRegressionNode) 
defaultRule.getLearningNode()).getStatisticsOtherBranchSplit()); // other
+                                                                               
                          // branch
+        defaultRule.split();
+        defaultRule.setRuleNumberID(++ruleNumberID);
+        // send out the new rule
+        sendAddRuleEvent(defaultRule.getRuleNumberID(), this.defaultRule);
+        defaultRule = newDefaultRule;
+      }
+    }
+  }
+
+  /*
+   * Create new rules
+   */
+  private ActiveRule newRule(int ID, RuleActiveRegressionNode node, double[] 
statistics) {
+    ActiveRule r = newRule(ID);
+
+    if (node != null)
+    {
+      if (node.getPerceptron() != null)
+      {
+        r.getLearningNode().setPerceptron(new 
Perceptron(node.getPerceptron()));
+        
r.getLearningNode().getPerceptron().setLearningRatio(this.learningRatio);
+      }
+      if (statistics == null)
+      {
+        double mean;
+        if (node.getNodeStatistics().getValue(0) > 0) {
+          mean = node.getNodeStatistics().getValue(1) / 
node.getNodeStatistics().getValue(0);
+          r.getLearningNode().getTargetMean().reset(mean, 1);
+        }
+      }
+    }
+    if (statistics != null && ((RuleActiveRegressionNode) 
r.getLearningNode()).getTargetMean() != null)
+    {
+      double mean;
+      if (statistics[0] > 0) {
+        mean = statistics[1] / statistics[0];
+        ((RuleActiveRegressionNode) 
r.getLearningNode()).getTargetMean().reset(mean, (long) statistics[0]);
+      }
+    }
+    return r;
+  }
+
+  private ActiveRule newRule(int ID) {
+    ActiveRule r = new ActiveRule.Builder().
+        threshold(this.pageHinckleyThreshold).
+        alpha(this.pageHinckleyAlpha).
+        changeDetection(this.driftDetection).
+        predictionFunction(this.predictionFunction).
+        statistics(new double[3]).
+        learningRatio(this.learningRatio).
+        numericObserver(numericObserver).
+        id(ID).build();
+    return r;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+    this.statistics = new double[] { 0.0, 0, 0 };
+    this.ruleNumberID = 0;
+    this.defaultRule = newRule(++this.ruleNumberID);
+  }
+
+  /*
+   * Clone processor
+   */
+  @Override
+  public Processor newProcessor(Processor p) {
+    AMRDefaultRuleProcessor oldProcessor = (AMRDefaultRuleProcessor) p;
+    Builder builder = new Builder(oldProcessor);
+    AMRDefaultRuleProcessor newProcessor = builder.build();
+    newProcessor.resultStream = oldProcessor.resultStream;
+    newProcessor.ruleStream = oldProcessor.ruleStream;
+    return newProcessor;
+  }
+
+  /*
+   * Send events
+   */
+  private void sendAddRuleEvent(int ruleID, ActiveRule rule) {
+    RuleContentEvent rce = new RuleContentEvent(ruleID, rule, false);
+    this.ruleStream.put(rce);
+  }
+
+  /*
+   * Output streams
+   */
+  public void setRuleStream(Stream ruleStream) {
+    this.ruleStream = ruleStream;
+  }
+
+  public Stream getRuleStream() {
+    return this.ruleStream;
+  }
+
+  public void setResultStream(Stream resultStream) {
+    this.resultStream = resultStream;
+  }
+
+  public Stream getResultStream() {
+    return this.resultStream;
+  }
+
+  /*
+   * Builder
+   */
+  public static class Builder {
+    private int pageHinckleyThreshold;
+    private double pageHinckleyAlpha;
+    private boolean driftDetection;
+    private int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2
+    private boolean constantLearningRatioDecay;
+    private double learningRatio;
+    private double splitConfidence;
+    private double tieThreshold;
+    private int gracePeriod;
+
+    private FIMTDDNumericAttributeClassLimitObserver numericObserver;
+
+    private Instances dataset;
+
+    public Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    public Builder(AMRDefaultRuleProcessor processor) {
+      this.pageHinckleyThreshold = processor.pageHinckleyThreshold;
+      this.pageHinckleyAlpha = processor.pageHinckleyAlpha;
+      this.driftDetection = processor.driftDetection;
+      this.predictionFunction = processor.predictionFunction;
+      this.constantLearningRatioDecay = processor.constantLearningRatioDecay;
+      this.learningRatio = processor.learningRatio;
+      this.splitConfidence = processor.splitConfidence;
+      this.tieThreshold = processor.tieThreshold;
+      this.gracePeriod = processor.gracePeriod;
+
+      this.numericObserver = processor.numericObserver;
+    }
+
+    public Builder threshold(int threshold) {
+      this.pageHinckleyThreshold = threshold;
+      return this;
+    }
+
+    public Builder alpha(double alpha) {
+      this.pageHinckleyAlpha = alpha;
+      return this;
+    }
+
+    public Builder changeDetection(boolean changeDetection) {
+      this.driftDetection = changeDetection;
+      return this;
+    }
+
+    public Builder predictionFunction(int predictionFunction) {
+      this.predictionFunction = predictionFunction;
+      return this;
+    }
+
+    public Builder constantLearningRatioDecay(boolean constantDecay) {
+      this.constantLearningRatioDecay = constantDecay;
+      return this;
+    }
+
+    public Builder learningRatio(double learningRatio) {
+      this.learningRatio = learningRatio;
+      return this;
+    }
+
+    public Builder splitConfidence(double splitConfidence) {
+      this.splitConfidence = splitConfidence;
+      return this;
+    }
+
+    public Builder tieThreshold(double tieThreshold) {
+      this.tieThreshold = tieThreshold;
+      return this;
+    }
+
+    public Builder gracePeriod(int gracePeriod) {
+      this.gracePeriod = gracePeriod;
+      return this;
+    }
+
+    public Builder numericObserver(FIMTDDNumericAttributeClassLimitObserver 
numericObserver) {
+      this.numericObserver = numericObserver;
+      return this;
+    }
+
+    public AMRDefaultRuleProcessor build() {
+      return new AMRDefaultRuleProcessor(this);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java
index 8ec118d..9d51075 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java
@@ -42,218 +42,219 @@ import com.yahoo.labs.samoa.topology.Stream;
  * Learner Processor (HAMR).
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class AMRLearnerProcessor implements Processor {
-       
-       /**
+
+  /**
         * 
         */
-       private static final long serialVersionUID = -2302897295090248013L;
-       
-       private static final Logger logger = 
-                       LoggerFactory.getLogger(AMRLearnerProcessor.class);
-
-       private int processorId;
-       
-       private transient List<ActiveRule> ruleSet;
-       
-       private Stream outputStream;
-       
-       private double splitConfidence;
-       private double tieThreshold;
-       private int gracePeriod;
-       
-       private boolean noAnomalyDetection;
-       private double multivariateAnomalyProbabilityThreshold;
-       private double univariateAnomalyprobabilityThreshold;
-       private int anomalyNumInstThreshold;
-       
-       public AMRLearnerProcessor(Builder builder) {
-               this.splitConfidence = builder.splitConfidence;
-               this.tieThreshold = builder.tieThreshold;
-               this.gracePeriod = builder.gracePeriod;
-               
-               this.noAnomalyDetection = builder.noAnomalyDetection;
-               this.multivariateAnomalyProbabilityThreshold = 
builder.multivariateAnomalyProbabilityThreshold;
-               this.univariateAnomalyprobabilityThreshold = 
builder.univariateAnomalyprobabilityThreshold;
-               this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold;
-       }
-
-       @Override
-       public boolean process(ContentEvent event) {
-               if (event instanceof AssignmentContentEvent) {
-                       AssignmentContentEvent attrContentEvent = 
(AssignmentContentEvent) event;
-                       
trainRuleOnInstance(attrContentEvent.getRuleNumberID(),attrContentEvent.getInstance());
-               }
-               else if (event instanceof RuleContentEvent) {
-                       RuleContentEvent ruleContentEvent = (RuleContentEvent) 
event;
-                       if (!ruleContentEvent.isRemoving()) {
-                               addRule(ruleContentEvent.getRule());
-                       }
-               }
-               
-               return false;
-       }
-       
-       /*
-        * Process input instances
-        */
-       private void trainRuleOnInstance(int ruleID, Instance instance) {
-        //System.out.println("Processor:"+this.processorId+": Rule:"+ruleID+" 
-> Counter="+counter);
-               Iterator<ActiveRule> ruleIterator= this.ruleSet.iterator();
-               while (ruleIterator.hasNext()) {
-                       ActiveRule rule = ruleIterator.next();
-                       if (rule.getRuleNumberID() == ruleID) {
-                               // Check (again) for coverage
-                               if (rule.isCovering(instance) == true) {
-                                       double error = 
rule.computeError(instance); //Use adaptive mode error
-                    boolean changeDetected = 
((RuleActiveRegressionNode)rule.getLearningNode()).updateChangeDetection(error);
-                    if (changeDetected == true) {
-                            ruleIterator.remove();
-                            
-                            this.sendRemoveRuleEvent(ruleID);
-                    } else {
-                       rule.updateStatistics(instance);
-                       if (rule.getInstancesSeen()  % this.gracePeriod == 0.0) 
{
-                               if (rule.tryToExpand(this.splitConfidence, 
this.tieThreshold) ) {
-                                       rule.split();
-                                       
-                                       // expanded: update Aggregator with 
new/updated predicate
-                                       
this.sendPredicate(rule.getRuleNumberID(), rule.getLastUpdatedRuleSplitNode(), 
-                                                                       
(RuleActiveRegressionNode)rule.getLearningNode());
-                               }       
-                               
-                       }
-                       
-                    }
-                               }
-                               
-                               return;
-                       }
-               }
-       }
-       
-       private boolean isAnomaly(Instance instance, LearningRule rule) {
-               //AMRUles is equipped with anomaly detection. If on, compute 
the anomaly value.                         
-               boolean isAnomaly = false;      
-               if (this.noAnomalyDetection == false){
-                       if (rule.getInstancesSeen() >= 
this.anomalyNumInstThreshold) {
-                               isAnomaly = rule.isAnomaly(instance, 
-                                               
this.univariateAnomalyprobabilityThreshold,
-                                               
this.multivariateAnomalyProbabilityThreshold,
-                                               this.anomalyNumInstThreshold);
-                       }
-               }
-               return isAnomaly;
-       }
-       
-       private void sendRemoveRuleEvent(int ruleID) {
-               RuleContentEvent rce = new RuleContentEvent(ruleID, null, true);
-               this.outputStream.put(rce);
-       }
-       
-       private void sendPredicate(int ruleID, RuleSplitNode splitNode, 
RuleActiveRegressionNode learningNode) {
-               this.outputStream.put(new PredicateContentEvent(ruleID, 
splitNode, new RulePassiveRegressionNode(learningNode)));
-       }
-       
-       /*
-        * Process control message (regarding adding or removing rules)
-        */
-       private boolean addRule(ActiveRule rule) {
-               this.ruleSet.add(rule);
-               return true;
-       }
-
-       @Override
-       public void onCreate(int id) {
-               this.processorId = id;
-               this.ruleSet = new LinkedList<ActiveRule>();
-       }
-
-       @Override
-       public Processor newProcessor(Processor p) {
-               AMRLearnerProcessor oldProcessor = (AMRLearnerProcessor)p;
-               AMRLearnerProcessor newProcessor = 
-                                       new 
AMRLearnerProcessor.Builder(oldProcessor).build();
-                       
-               newProcessor.setOutputStream(oldProcessor.outputStream);
-               return newProcessor;
-       }
-       
-       /*
-        * Builder
-        */
-       public static class Builder {
-               private double splitConfidence;
-               private double tieThreshold;
-               private int gracePeriod;
-               
-               private boolean noAnomalyDetection;
-               private double multivariateAnomalyProbabilityThreshold;
-               private double univariateAnomalyprobabilityThreshold;
-               private int anomalyNumInstThreshold;
-               
-               private Instances dataset;
-               
-               public Builder(Instances dataset){
-                       this.dataset = dataset;
-               }
-               
-               public Builder(AMRLearnerProcessor processor) {
-                       this.splitConfidence = processor.splitConfidence;
-                       this.tieThreshold = processor.tieThreshold;
-                       this.gracePeriod = processor.gracePeriod;
-               }
-               
-               public Builder splitConfidence(double splitConfidence) {
-                       this.splitConfidence = splitConfidence;
-                       return this;
-               }
-               
-               public Builder tieThreshold(double tieThreshold) {
-                       this.tieThreshold = tieThreshold;
-                       return this;
-               }
-               
-               public Builder gracePeriod(int gracePeriod) {
-                       this.gracePeriod = gracePeriod;
-                       return this;
-               }
-               
-               public Builder noAnomalyDetection(boolean noAnomalyDetection) {
-                       this.noAnomalyDetection = noAnomalyDetection;
-                       return this;
-               }
-               
-               public Builder multivariateAnomalyProbabilityThreshold(double 
mAnomalyThreshold) {
-                       this.multivariateAnomalyProbabilityThreshold = 
mAnomalyThreshold;
-                       return this;
-               }
-               
-               public Builder univariateAnomalyProbabilityThreshold(double 
uAnomalyThreshold) {
-                       this.univariateAnomalyprobabilityThreshold = 
uAnomalyThreshold;
-                       return this;
-               }
-               
-               public Builder anomalyNumberOfInstancesThreshold(int 
anomalyNumInstThreshold) {
-                       this.anomalyNumInstThreshold = anomalyNumInstThreshold;
-                       return this;
-               }
-               
-               public AMRLearnerProcessor build() {
-                       return new AMRLearnerProcessor(this);
-               }
-       }
-       
-       /*
-        * Output stream
-        */
-       public void setOutputStream(Stream stream) {
-               this.outputStream = stream;
-       }
-       
-       public Stream getOutputStream() {
-               return this.outputStream;
-       }
+  private static final long serialVersionUID = -2302897295090248013L;
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AMRLearnerProcessor.class);
+
+  private int processorId;
+
+  private transient List<ActiveRule> ruleSet;
+
+  private Stream outputStream;
+
+  private double splitConfidence;
+  private double tieThreshold;
+  private int gracePeriod;
+
+  private boolean noAnomalyDetection;
+  private double multivariateAnomalyProbabilityThreshold;
+  private double univariateAnomalyprobabilityThreshold;
+  private int anomalyNumInstThreshold;
+
+  public AMRLearnerProcessor(Builder builder) {
+    this.splitConfidence = builder.splitConfidence;
+    this.tieThreshold = builder.tieThreshold;
+    this.gracePeriod = builder.gracePeriod;
+
+    this.noAnomalyDetection = builder.noAnomalyDetection;
+    this.multivariateAnomalyProbabilityThreshold = 
builder.multivariateAnomalyProbabilityThreshold;
+    this.univariateAnomalyprobabilityThreshold = 
builder.univariateAnomalyprobabilityThreshold;
+    this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold;
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+    if (event instanceof AssignmentContentEvent) {
+      AssignmentContentEvent attrContentEvent = (AssignmentContentEvent) event;
+      trainRuleOnInstance(attrContentEvent.getRuleNumberID(), 
attrContentEvent.getInstance());
+    }
+    else if (event instanceof RuleContentEvent) {
+      RuleContentEvent ruleContentEvent = (RuleContentEvent) event;
+      if (!ruleContentEvent.isRemoving()) {
+        addRule(ruleContentEvent.getRule());
+      }
+    }
+
+    return false;
+  }
+
+  /*
+   * Process input instances
+   */
+  private void trainRuleOnInstance(int ruleID, Instance instance) {
+    // System.out.println("Processor:"+this.processorId+": Rule:"+ruleID+" -> 
Counter="+counter);
+    Iterator<ActiveRule> ruleIterator = this.ruleSet.iterator();
+    while (ruleIterator.hasNext()) {
+      ActiveRule rule = ruleIterator.next();
+      if (rule.getRuleNumberID() == ruleID) {
+        // Check (again) for coverage
+        if (rule.isCovering(instance) == true) {
+          double error = rule.computeError(instance); // Use adaptive mode 
error
+          boolean changeDetected = ((RuleActiveRegressionNode) 
rule.getLearningNode()).updateChangeDetection(error);
+          if (changeDetected == true) {
+            ruleIterator.remove();
+
+            this.sendRemoveRuleEvent(ruleID);
+          } else {
+            rule.updateStatistics(instance);
+            if (rule.getInstancesSeen() % this.gracePeriod == 0.0) {
+              if (rule.tryToExpand(this.splitConfidence, this.tieThreshold)) {
+                rule.split();
+
+                // expanded: update Aggregator with new/updated predicate
+                this.sendPredicate(rule.getRuleNumberID(), 
rule.getLastUpdatedRuleSplitNode(),
+                    (RuleActiveRegressionNode) rule.getLearningNode());
+              }
+
+            }
+
+          }
+        }
+
+        return;
+      }
+    }
+  }
+
+  private boolean isAnomaly(Instance instance, LearningRule rule) {
+    // AMRUles is equipped with anomaly detection. If on, compute the anomaly
+    // value.
+    boolean isAnomaly = false;
+    if (this.noAnomalyDetection == false) {
+      if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) {
+        isAnomaly = rule.isAnomaly(instance,
+            this.univariateAnomalyprobabilityThreshold,
+            this.multivariateAnomalyProbabilityThreshold,
+            this.anomalyNumInstThreshold);
+      }
+    }
+    return isAnomaly;
+  }
+
+  private void sendRemoveRuleEvent(int ruleID) {
+    RuleContentEvent rce = new RuleContentEvent(ruleID, null, true);
+    this.outputStream.put(rce);
+  }
+
+  private void sendPredicate(int ruleID, RuleSplitNode splitNode, 
RuleActiveRegressionNode learningNode) {
+    this.outputStream.put(new PredicateContentEvent(ruleID, splitNode, new 
RulePassiveRegressionNode(learningNode)));
+  }
+
+  /*
+   * Process control message (regarding adding or removing rules)
+   */
+  private boolean addRule(ActiveRule rule) {
+    this.ruleSet.add(rule);
+    return true;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+    this.ruleSet = new LinkedList<ActiveRule>();
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    AMRLearnerProcessor oldProcessor = (AMRLearnerProcessor) p;
+    AMRLearnerProcessor newProcessor =
+        new AMRLearnerProcessor.Builder(oldProcessor).build();
+
+    newProcessor.setOutputStream(oldProcessor.outputStream);
+    return newProcessor;
+  }
+
+  /*
+   * Builder
+   */
+  public static class Builder {
+    private double splitConfidence;
+    private double tieThreshold;
+    private int gracePeriod;
+
+    private boolean noAnomalyDetection;
+    private double multivariateAnomalyProbabilityThreshold;
+    private double univariateAnomalyprobabilityThreshold;
+    private int anomalyNumInstThreshold;
+
+    private Instances dataset;
+
+    public Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    public Builder(AMRLearnerProcessor processor) {
+      this.splitConfidence = processor.splitConfidence;
+      this.tieThreshold = processor.tieThreshold;
+      this.gracePeriod = processor.gracePeriod;
+    }
+
+    public Builder splitConfidence(double splitConfidence) {
+      this.splitConfidence = splitConfidence;
+      return this;
+    }
+
+    public Builder tieThreshold(double tieThreshold) {
+      this.tieThreshold = tieThreshold;
+      return this;
+    }
+
+    public Builder gracePeriod(int gracePeriod) {
+      this.gracePeriod = gracePeriod;
+      return this;
+    }
+
+    public Builder noAnomalyDetection(boolean noAnomalyDetection) {
+      this.noAnomalyDetection = noAnomalyDetection;
+      return this;
+    }
+
+    public Builder multivariateAnomalyProbabilityThreshold(double 
mAnomalyThreshold) {
+      this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold;
+      return this;
+    }
+
+    public Builder univariateAnomalyProbabilityThreshold(double 
uAnomalyThreshold) {
+      this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold;
+      return this;
+    }
+
+    public Builder anomalyNumberOfInstancesThreshold(int 
anomalyNumInstThreshold) {
+      this.anomalyNumInstThreshold = anomalyNumInstThreshold;
+      return this;
+    }
+
+    public AMRLearnerProcessor build() {
+      return new AMRLearnerProcessor(this);
+    }
+  }
+
+  /*
+   * Output stream
+   */
+  public void setOutputStream(Stream stream) {
+    this.outputStream = stream;
+  }
+
+  public Stream getOutputStream() {
+    return this.outputStream;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
index 38a0be1..88bf375 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
@@ -40,323 +40,333 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Model Aggregator Processor (HAMR).
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class AMRRuleSetProcessor implements Processor {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = -6544096255649379334L;
-       private static final Logger logger = 
LoggerFactory.getLogger(AMRRuleSetProcessor.class);
-
-       private int processorId;
-
-       // Rules & default rule
-       protected transient List<PassiveRule> ruleSet;
-
-       // SAMOA Stream
-       private Stream statisticsStream;
-       private Stream resultStream;
-       private Stream defaultRuleStream;
-
-       // Options
-       protected boolean noAnomalyDetection;
-       protected double multivariateAnomalyProbabilityThreshold;
-       protected double univariateAnomalyprobabilityThreshold;
-       protected int anomalyNumInstThreshold;
-
-       protected boolean unorderedRules;
-       
-       protected int voteType;
-       
-       /*
-        * Constructor
-        */
-       public AMRRuleSetProcessor (Builder builder) {
-               
-               this.noAnomalyDetection = builder.noAnomalyDetection;
-               this.multivariateAnomalyProbabilityThreshold = 
builder.multivariateAnomalyProbabilityThreshold;
-               this.univariateAnomalyprobabilityThreshold = 
builder.univariateAnomalyprobabilityThreshold;
-               this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold;
-               this.unorderedRules = builder.unorderedRules;
-               
-               this.voteType = builder.voteType;
-       }
-       /* (non-Javadoc)
-        * @see 
com.yahoo.labs.samoa.core.Processor#process(com.yahoo.labs.samoa.core.ContentEvent)
-        */
-       @Override
-       public boolean process(ContentEvent event) {
-               if (event instanceof InstanceContentEvent) {
-                       this.processInstanceEvent((InstanceContentEvent) event);
-               }
-               else if (event instanceof PredicateContentEvent) {
-                       PredicateContentEvent pce = (PredicateContentEvent) 
event;
-                       if (pce.getRuleSplitNode() == null) {
-                               this.updateLearningNode(pce);
-                       }
-                       else {
-                               this.updateRuleSplitNode(pce);
-                       }
-               }
-               else if (event instanceof RuleContentEvent) {
-                       RuleContentEvent rce = (RuleContentEvent) event;
-                       if (rce.isRemoving()) {
-                               this.removeRule(rce.getRuleNumberID());
-                       }
-                       else {
-                               addRule(rce.getRule());
-                       }
-               }
-               return true;
-       }
-       
-       private void processInstanceEvent(InstanceContentEvent instanceEvent) {
-               Instance instance = instanceEvent.getInstance();
-               boolean predictionCovered = false;
-               boolean trainingCovered = false;
-               boolean continuePrediction = instanceEvent.isTesting();
-               boolean continueTraining = instanceEvent.isTraining();
-               
-               ErrorWeightedVote errorWeightedVote = newErrorWeightedVote();
-               for (PassiveRule aRuleSet : this.ruleSet) {
-                       if (!continuePrediction && !continueTraining)
-                               break;
-
-                       if (aRuleSet.isCovering(instance)) {
-                               predictionCovered = true;
-
-                               if (continuePrediction) {
-                                       double[] vote = 
aRuleSet.getPrediction(instance);
-                                       double error = 
aRuleSet.getCurrentError();
-                                       errorWeightedVote.addVote(vote, error);
-                                       if (!this.unorderedRules) 
continuePrediction = false;
-                               }
-
-                               if (continueTraining) {
-                                       if (!isAnomaly(instance, aRuleSet)) {
-                                               trainingCovered = true;
-                                               
aRuleSet.updateStatistics(instance);
-
-                                               // Send instance to statistics 
PIs
-                                               sendInstanceToRule(instance, 
aRuleSet.getRuleNumberID());
-
-                                               if (!this.unorderedRules) 
continueTraining = false;
-                                       }
-                               }
-                       }
-               }
-               
-               if (predictionCovered) {
-                       // Combined prediction
-                       ResultContentEvent rce = 
newResultContentEvent(errorWeightedVote.computeWeightedVote(), instanceEvent);
-                       resultStream.put(rce);
-               }
-               
-               boolean defaultPrediction = instanceEvent.isTesting() && 
!predictionCovered;
-               boolean defaultTraining = instanceEvent.isTraining() && 
!trainingCovered;
-               if (defaultPrediction || defaultTraining) {
-                       instanceEvent.setTesting(defaultPrediction);
-                       instanceEvent.setTraining(defaultTraining);
-                       this.defaultRuleStream.put(instanceEvent);
-               }
-       }
-       
-       private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent){
-               ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), 
inEvent.getClassId(), prediction, inEvent.isLastEvent());
-               rce.setClassifierIndex(this.processorId);
-               rce.setEvaluationIndex(inEvent.getEvaluationIndex());
-               return rce;
-       }
-
-       public ErrorWeightedVote newErrorWeightedVote() {
-               // TODO: do a reset instead of init a new object
-               if (voteType == 1) 
-                       return new UniformWeightedVote();
-               return new InverseErrorWeightedVote();
-       }
-
-       /**
-        * Method to verify if the instance is an anomaly.
-        * @param instance
-        * @param rule
-        * @return
-        */
-       private boolean isAnomaly(Instance instance, LearningRule rule) {
-               //AMRUles is equipped with anomaly detection. If on, compute 
the anomaly value.                         
-               boolean isAnomaly = false;      
-               if (!this.noAnomalyDetection){
-                       if (rule.getInstancesSeen() >= 
this.anomalyNumInstThreshold) {
-                               isAnomaly = rule.isAnomaly(instance, 
-                                               
this.univariateAnomalyprobabilityThreshold,
-                                               
this.multivariateAnomalyProbabilityThreshold,
-                                               this.anomalyNumInstThreshold);
-                       }
-               }
-               return isAnomaly;
-       }
-       
-       /*
-        * Add predicate/RuleSplitNode for a rule
-        */
-       private void updateRuleSplitNode(PredicateContentEvent pce) {
-               int ruleID = pce.getRuleNumberID();
-               for (PassiveRule rule:ruleSet) {
-                       if (rule.getRuleNumberID() == ruleID) {
-                               rule.nodeListAdd(pce.getRuleSplitNode());
-                               rule.setLearningNode(pce.getLearningNode());
-                       }
-               }
-       }
-       
-       private void updateLearningNode(PredicateContentEvent pce) {
-               int ruleID = pce.getRuleNumberID();
-               for (PassiveRule rule:ruleSet) {
-                       if (rule.getRuleNumberID() == ruleID) {
-                               rule.setLearningNode(pce.getLearningNode());
-                       }
-               }
-       }
-       
-       /*
-        * Add new rule/Remove rule
-        */
-       private boolean addRule(ActiveRule rule) {
-               this.ruleSet.add(new PassiveRule(rule));
-               return true;
-       }
-       
-       private void removeRule(int ruleID) {
-               for (PassiveRule rule:ruleSet) {
-                       if (rule.getRuleNumberID() == ruleID) {
-                               ruleSet.remove(rule);
-                               break;
-                       }
-               }
-       }
-
-       @Override
-       public void onCreate(int id) {
-               this.processorId = id;
-               this.ruleSet = new LinkedList<PassiveRule>();
-               
-       }
-       
-       /* 
-        * Clone processor
-        */
-       @Override
-       public Processor newProcessor(Processor p) {
-               AMRRuleSetProcessor oldProcessor = (AMRRuleSetProcessor) p;
-               Builder builder = new Builder(oldProcessor);
-               AMRRuleSetProcessor newProcessor = builder.build();
-               newProcessor.resultStream = oldProcessor.resultStream;
-               newProcessor.statisticsStream = oldProcessor.statisticsStream;
-               newProcessor.defaultRuleStream = oldProcessor.defaultRuleStream;
-               return newProcessor;
-       }
-       
-       /*
-        * Send events
-        */
-       private void sendInstanceToRule(Instance instance, int ruleID) {
-               AssignmentContentEvent ace = new AssignmentContentEvent(ruleID, 
instance);
-               this.statisticsStream.put(ace);
-       }
-       
-       /*
-        * Output streams
-        */
-       public void setStatisticsStream(Stream statisticsStream) {
-               this.statisticsStream = statisticsStream;
-       }
-       
-       public Stream getStatisticsStream() {
-               return this.statisticsStream;
-       }
-       
-       public void setResultStream(Stream resultStream) {
-               this.resultStream = resultStream;
-       }
-       
-       public Stream getResultStream() {
-               return this.resultStream;
-       }
-       
-       public Stream getDefaultRuleStream() {
-               return this.defaultRuleStream;
-       }
-       
-       public void setDefaultRuleStream(Stream defaultRuleStream) {
-               this.defaultRuleStream = defaultRuleStream;
-       }
-       
-       /*
-        * Builder
-        */
-       public static class Builder {
-               private boolean noAnomalyDetection;
-               private double multivariateAnomalyProbabilityThreshold;
-               private double univariateAnomalyprobabilityThreshold;
-               private int anomalyNumInstThreshold;
-               
-               private boolean unorderedRules;
-               
-//             private FIMTDDNumericAttributeClassLimitObserver 
numericObserver;
-               private int voteType;
-               
-               private Instances dataset;
-               
-               public Builder(Instances dataset){
-                       this.dataset = dataset;
-               }
-               
-               public Builder(AMRRuleSetProcessor processor) {
-                       
-                       this.noAnomalyDetection = processor.noAnomalyDetection;
-                       this.multivariateAnomalyProbabilityThreshold = 
processor.multivariateAnomalyProbabilityThreshold;
-                       this.univariateAnomalyprobabilityThreshold = 
processor.univariateAnomalyprobabilityThreshold;
-                       this.anomalyNumInstThreshold = 
processor.anomalyNumInstThreshold;
-                       this.unorderedRules = processor.unorderedRules;
-                       
-                       this.voteType = processor.voteType;
-               }
-               
-               public Builder noAnomalyDetection(boolean noAnomalyDetection) {
-                       this.noAnomalyDetection = noAnomalyDetection;
-                       return this;
-               }
-               
-               public Builder multivariateAnomalyProbabilityThreshold(double 
mAnomalyThreshold) {
-                       this.multivariateAnomalyProbabilityThreshold = 
mAnomalyThreshold;
-                       return this;
-               }
-               
-               public Builder univariateAnomalyProbabilityThreshold(double 
uAnomalyThreshold) {
-                       this.univariateAnomalyprobabilityThreshold = 
uAnomalyThreshold;
-                       return this;
-               }
-               
-               public Builder anomalyNumberOfInstancesThreshold(int 
anomalyNumInstThreshold) {
-                       this.anomalyNumInstThreshold = anomalyNumInstThreshold;
-                       return this;
-               }
-               
-               public Builder unorderedRules(boolean unorderedRules) {
-                       this.unorderedRules = unorderedRules;
-                       return this;
-               }
-               
-               public Builder voteType(int voteType) {
-                       this.voteType = voteType;
-                       return this;
-               }
-               
-               public AMRRuleSetProcessor build() {
-                       return new AMRRuleSetProcessor(this);
-               }
-       }
+  private static final long serialVersionUID = -6544096255649379334L;
+  private static final Logger logger = 
LoggerFactory.getLogger(AMRRuleSetProcessor.class);
+
+  private int processorId;
+
+  // Rules & default rule
+  protected transient List<PassiveRule> ruleSet;
+
+  // SAMOA Stream
+  private Stream statisticsStream;
+  private Stream resultStream;
+  private Stream defaultRuleStream;
+
+  // Options
+  protected boolean noAnomalyDetection;
+  protected double multivariateAnomalyProbabilityThreshold;
+  protected double univariateAnomalyprobabilityThreshold;
+  protected int anomalyNumInstThreshold;
+
+  protected boolean unorderedRules;
+
+  protected int voteType;
+
+  /*
+   * Constructor
+   */
+  public AMRRuleSetProcessor(Builder builder) {
+
+    this.noAnomalyDetection = builder.noAnomalyDetection;
+    this.multivariateAnomalyProbabilityThreshold = 
builder.multivariateAnomalyProbabilityThreshold;
+    this.univariateAnomalyprobabilityThreshold = 
builder.univariateAnomalyprobabilityThreshold;
+    this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold;
+    this.unorderedRules = builder.unorderedRules;
+
+    this.voteType = builder.voteType;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see 
com.yahoo.labs.samoa.core.Processor#process(com.yahoo.labs.samoa.core.
+   * ContentEvent)
+   */
+  @Override
+  public boolean process(ContentEvent event) {
+    if (event instanceof InstanceContentEvent) {
+      this.processInstanceEvent((InstanceContentEvent) event);
+    }
+    else if (event instanceof PredicateContentEvent) {
+      PredicateContentEvent pce = (PredicateContentEvent) event;
+      if (pce.getRuleSplitNode() == null) {
+        this.updateLearningNode(pce);
+      }
+      else {
+        this.updateRuleSplitNode(pce);
+      }
+    }
+    else if (event instanceof RuleContentEvent) {
+      RuleContentEvent rce = (RuleContentEvent) event;
+      if (rce.isRemoving()) {
+        this.removeRule(rce.getRuleNumberID());
+      }
+      else {
+        addRule(rce.getRule());
+      }
+    }
+    return true;
+  }
+
+  private void processInstanceEvent(InstanceContentEvent instanceEvent) {
+    Instance instance = instanceEvent.getInstance();
+    boolean predictionCovered = false;
+    boolean trainingCovered = false;
+    boolean continuePrediction = instanceEvent.isTesting();
+    boolean continueTraining = instanceEvent.isTraining();
+
+    ErrorWeightedVote errorWeightedVote = newErrorWeightedVote();
+    for (PassiveRule aRuleSet : this.ruleSet) {
+      if (!continuePrediction && !continueTraining)
+        break;
+
+      if (aRuleSet.isCovering(instance)) {
+        predictionCovered = true;
+
+        if (continuePrediction) {
+          double[] vote = aRuleSet.getPrediction(instance);
+          double error = aRuleSet.getCurrentError();
+          errorWeightedVote.addVote(vote, error);
+          if (!this.unorderedRules)
+            continuePrediction = false;
+        }
+
+        if (continueTraining) {
+          if (!isAnomaly(instance, aRuleSet)) {
+            trainingCovered = true;
+            aRuleSet.updateStatistics(instance);
+
+            // Send instance to statistics PIs
+            sendInstanceToRule(instance, aRuleSet.getRuleNumberID());
+
+            if (!this.unorderedRules)
+              continueTraining = false;
+          }
+        }
+      }
+    }
+
+    if (predictionCovered) {
+      // Combined prediction
+      ResultContentEvent rce = 
newResultContentEvent(errorWeightedVote.computeWeightedVote(), instanceEvent);
+      resultStream.put(rce);
+    }
+
+    boolean defaultPrediction = instanceEvent.isTesting() && 
!predictionCovered;
+    boolean defaultTraining = instanceEvent.isTraining() && !trainingCovered;
+    if (defaultPrediction || defaultTraining) {
+      instanceEvent.setTesting(defaultPrediction);
+      instanceEvent.setTraining(defaultTraining);
+      this.defaultRuleStream.put(instanceEvent);
+    }
+  }
+
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  public ErrorWeightedVote newErrorWeightedVote() {
+    // TODO: do a reset instead of init a new object
+    if (voteType == 1)
+      return new UniformWeightedVote();
+    return new InverseErrorWeightedVote();
+  }
+
+  /**
+   * Method to verify if the instance is an anomaly.
+   * 
+   * @param instance
+   * @param rule
+   * @return
+   */
+  private boolean isAnomaly(Instance instance, LearningRule rule) {
+    // AMRUles is equipped with anomaly detection. If on, compute the anomaly
+    // value.
+    boolean isAnomaly = false;
+    if (!this.noAnomalyDetection) {
+      if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) {
+        isAnomaly = rule.isAnomaly(instance,
+            this.univariateAnomalyprobabilityThreshold,
+            this.multivariateAnomalyProbabilityThreshold,
+            this.anomalyNumInstThreshold);
+      }
+    }
+    return isAnomaly;
+  }
+
+  /*
+   * Add predicate/RuleSplitNode for a rule
+   */
+  private void updateRuleSplitNode(PredicateContentEvent pce) {
+    int ruleID = pce.getRuleNumberID();
+    for (PassiveRule rule : ruleSet) {
+      if (rule.getRuleNumberID() == ruleID) {
+        rule.nodeListAdd(pce.getRuleSplitNode());
+        rule.setLearningNode(pce.getLearningNode());
+      }
+    }
+  }
+
+  private void updateLearningNode(PredicateContentEvent pce) {
+    int ruleID = pce.getRuleNumberID();
+    for (PassiveRule rule : ruleSet) {
+      if (rule.getRuleNumberID() == ruleID) {
+        rule.setLearningNode(pce.getLearningNode());
+      }
+    }
+  }
+
+  /*
+   * Add new rule/Remove rule
+   */
+  private boolean addRule(ActiveRule rule) {
+    this.ruleSet.add(new PassiveRule(rule));
+    return true;
+  }
+
+  private void removeRule(int ruleID) {
+    for (PassiveRule rule : ruleSet) {
+      if (rule.getRuleNumberID() == ruleID) {
+        ruleSet.remove(rule);
+        break;
+      }
+    }
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+    this.ruleSet = new LinkedList<PassiveRule>();
+
+  }
+
+  /*
+   * Clone processor
+   */
+  @Override
+  public Processor newProcessor(Processor p) {
+    AMRRuleSetProcessor oldProcessor = (AMRRuleSetProcessor) p;
+    Builder builder = new Builder(oldProcessor);
+    AMRRuleSetProcessor newProcessor = builder.build();
+    newProcessor.resultStream = oldProcessor.resultStream;
+    newProcessor.statisticsStream = oldProcessor.statisticsStream;
+    newProcessor.defaultRuleStream = oldProcessor.defaultRuleStream;
+    return newProcessor;
+  }
+
+  /*
+   * Send events
+   */
+  private void sendInstanceToRule(Instance instance, int ruleID) {
+    AssignmentContentEvent ace = new AssignmentContentEvent(ruleID, instance);
+    this.statisticsStream.put(ace);
+  }
+
+  /*
+   * Output streams
+   */
+  public void setStatisticsStream(Stream statisticsStream) {
+    this.statisticsStream = statisticsStream;
+  }
+
+  public Stream getStatisticsStream() {
+    return this.statisticsStream;
+  }
+
+  public void setResultStream(Stream resultStream) {
+    this.resultStream = resultStream;
+  }
+
+  public Stream getResultStream() {
+    return this.resultStream;
+  }
+
+  public Stream getDefaultRuleStream() {
+    return this.defaultRuleStream;
+  }
+
+  public void setDefaultRuleStream(Stream defaultRuleStream) {
+    this.defaultRuleStream = defaultRuleStream;
+  }
+
+  /*
+   * Builder
+   */
+  public static class Builder {
+    private boolean noAnomalyDetection;
+    private double multivariateAnomalyProbabilityThreshold;
+    private double univariateAnomalyprobabilityThreshold;
+    private int anomalyNumInstThreshold;
+
+    private boolean unorderedRules;
+
+    // private FIMTDDNumericAttributeClassLimitObserver numericObserver;
+    private int voteType;
+
+    private Instances dataset;
+
+    public Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    public Builder(AMRRuleSetProcessor processor) {
+
+      this.noAnomalyDetection = processor.noAnomalyDetection;
+      this.multivariateAnomalyProbabilityThreshold = 
processor.multivariateAnomalyProbabilityThreshold;
+      this.univariateAnomalyprobabilityThreshold = 
processor.univariateAnomalyprobabilityThreshold;
+      this.anomalyNumInstThreshold = processor.anomalyNumInstThreshold;
+      this.unorderedRules = processor.unorderedRules;
+
+      this.voteType = processor.voteType;
+    }
+
+    public Builder noAnomalyDetection(boolean noAnomalyDetection) {
+      this.noAnomalyDetection = noAnomalyDetection;
+      return this;
+    }
+
+    public Builder multivariateAnomalyProbabilityThreshold(double 
mAnomalyThreshold) {
+      this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold;
+      return this;
+    }
+
+    public Builder univariateAnomalyProbabilityThreshold(double 
uAnomalyThreshold) {
+      this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold;
+      return this;
+    }
+
+    public Builder anomalyNumberOfInstancesThreshold(int 
anomalyNumInstThreshold) {
+      this.anomalyNumInstThreshold = anomalyNumInstThreshold;
+      return this;
+    }
+
+    public Builder unorderedRules(boolean unorderedRules) {
+      this.unorderedRules = unorderedRules;
+      return this;
+    }
+
+    public Builder voteType(int voteType) {
+      this.voteType = voteType;
+      return this;
+    }
+
+    public AMRRuleSetProcessor build() {
+      return new AMRRuleSetProcessor(this);
+    }
+  }
 
 }

Reply via email to