http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java index 45f5719..068957d 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleRegressionNode.java @@ -29,264 +29,268 @@ import com.yahoo.labs.samoa.moa.core.DoubleVector; * The base class for LearningNode for regression rule. * * @author Anh Thu Vu - * + * */ public abstract class RuleRegressionNode implements Serializable { - - private static final long serialVersionUID = 9129659494380381126L; - - protected int predictionFunction; - protected int ruleNumberID; - // The statistics for this node: - // Number of instances that have reached it - // Sum of y values - // Sum of squared y values - protected DoubleVector nodeStatistics; - - protected Perceptron perceptron; - protected TargetMean targetMean; - protected double learningRatio; - - /* - * Simple setters & getters - */ - public Perceptron getPerceptron() { - return perceptron; - } - - public void setPerceptron(Perceptron perceptron) { - this.perceptron = perceptron; - } - - public TargetMean getTargetMean() { - return targetMean; - } - - public void setTargetMean(TargetMean targetMean) { - this.targetMean = targetMean; - } - - /* - * Create a new RuleRegressionNode - */ - public RuleRegressionNode(double[] initialClassObservations) { - this.nodeStatistics = new DoubleVector(initialClassObservations); - } - - public RuleRegressionNode() { - this(new double[0]); - } - - /* - * Update statistics with input instance - */ - public abstract void updateStatistics(Instance instance); - - /* - * Predictions - */ - public double[] getPrediction(Instance instance) { - int predictionMode = this.getLearnerToUse(this.predictionFunction); - return getPrediction(instance, predictionMode); - } - - public double[] getSimplePrediction() { - if( this.targetMean!=null) - return this.targetMean.getVotesForInstance(); - else - return new double[]{0}; - } - - public double[] getPrediction(Instance instance, int predictionMode) { - double[] ret; - if (predictionMode == 1) - ret=this.perceptron.getVotesForInstance(instance); - else - ret=this.targetMean.getVotesForInstance(instance); - return ret; - } - - public double getNormalizedPrediction(Instance instance) { - double res; - double [] aux; - switch (this.predictionFunction) { - //perceptron - 1 - case 1: - res=this.perceptron.normalizedPrediction(instance); - break; - //target mean - 2 - case 2: - aux=this.targetMean.getVotesForInstance(); - res=normalize(aux[0]); - break; - //adaptive - 0 - case 0: - int predictionMode = this.getLearnerToUse(0); - if(predictionMode == 1) - { - res=this.perceptron.normalizedPrediction(instance); - } - else{ - aux=this.targetMean.getVotesForInstance(instance); - res = normalize(aux[0]); - } - break; - default: - throw new UnsupportedOperationException("Prediction mode not in range."); - } - return res; - } - - /* - * Get learner mode - */ - public int getLearnerToUse(int predMode) { - int predictionMode = predMode; - if (predictionMode == 0) { - double perceptronError= this.perceptron.getCurrentError(); - double meanTargetError =this.targetMean.getCurrentError(); - if (perceptronError < meanTargetError) - predictionMode = 1; //PERCEPTRON - else - predictionMode = 2; //TARGET MEAN - } - return predictionMode; - } - - /* - * Error and change detection - */ - public double computeError(Instance instance) { - double normalizedPrediction = getNormalizedPrediction(instance); - double normalizedClassValue = normalize(instance.classValue()); - return Math.abs(normalizedClassValue - normalizedPrediction); - } - - public double getCurrentError() { - double error; - if (this.perceptron!=null){ - if (targetMean==null) - error=perceptron.getCurrentError(); - else{ - double errorP=perceptron.getCurrentError(); - double errorTM=targetMean.getCurrentError(); - error = (errorP<errorTM) ? errorP : errorTM; - } - } - else - error=Double.MAX_VALUE; - return error; - } - - /* - * no. of instances seen - */ - public long getInstancesSeen() { - if (nodeStatistics != null) { - return (long)this.nodeStatistics.getValue(0); - } else { - return 0; - } - } - - public DoubleVector getNodeStatistics(){ - return this.nodeStatistics; - } - - /* - * Anomaly detection - */ - public boolean isAnomaly(Instance instance, - double uniVariateAnomalyProbabilityThreshold, - double multiVariateAnomalyProbabilityThreshold, - int numberOfInstanceesForAnomaly) { - //AMRUles is equipped with anomaly detection. If on, compute the anomaly value. - long perceptronIntancesSeen=this.perceptron.getInstancesSeen(); - if ( perceptronIntancesSeen>= numberOfInstanceesForAnomaly) { - double attribSum; - double attribSquaredSum; - double D = 0.0; - double N = 0.0; - double anomaly; - - for (int x = 0; x < instance.numAttributes() - 1; x++) { - // Perceptron is initialized each rule. - // this is a local anomaly. - int instAttIndex = modelAttIndexToInstanceAttIndex(x, instance); - attribSum = this.perceptron.perceptronattributeStatistics.getValue(x); - attribSquaredSum = this.perceptron.squaredperceptronattributeStatistics.getValue(x); - double mean = attribSum / perceptronIntancesSeen; - double sd = computeSD(attribSquaredSum, attribSum, perceptronIntancesSeen); - double probability = computeProbability(mean, sd, instance.value(instAttIndex)); - - if (probability > 0.0) { - D = D + Math.abs(Math.log(probability)); - if (probability < uniVariateAnomalyProbabilityThreshold) {//0.10 - N = N + Math.abs(Math.log(probability)); - } - } - } - - anomaly = 0.0; - if (D != 0.0) { - anomaly = N / D; - } - if (anomaly >= multiVariateAnomalyProbabilityThreshold) { - //debuganomaly(instance, - // uniVariateAnomalyProbabilityThreshold, - // multiVariateAnomalyProbabilityThreshold, - // anomaly); - return true; - } - } - return false; - } - - /* - * Helpers - */ - public static double computeProbability(double mean, double sd, double value) { - double probability = 0.0; - - if (sd > 0.0) { - double k = (Math.abs(value - mean) / sd); // One tailed variant of Chebyshev's inequality - probability= 1.0 / (1+k*k); - } - - return probability; - } - - public static double computeHoeffdingBound(double range, double confidence, double n) { - return Math.sqrt(((range * range) * Math.log(1.0 / confidence)) / (2.0 * n)); - } - - private double normalize(double value) { - double meanY = this.nodeStatistics.getValue(1)/this.nodeStatistics.getValue(0); - double sdY = computeSD(this.nodeStatistics.getValue(2), this.nodeStatistics.getValue(1), (long)this.nodeStatistics.getValue(0)); - double normalizedY = 0.0; - if (sdY > 0.0000001) { - normalizedY = (value - meanY) / (sdY); - } - return normalizedY; - } - - - public double computeSD(double squaredVal, double val, long size) { - if (size > 1) { - return Math.sqrt((squaredVal - ((val * val) / size)) / (size - 1.0)); - } - return 0.0; - } - - /** - * Gets the index of the attribute in the instance, - * given the index of the attribute in the learner. - * - * @param index the index of the attribute in the learner - * @param inst the instance - * @return the index in the instance - */ - protected static int modelAttIndexToInstanceAttIndex(int index, Instance inst) { - return index<= inst.classIndex() ? index : index + 1; + + private static final long serialVersionUID = 9129659494380381126L; + + protected int predictionFunction; + protected int ruleNumberID; + // The statistics for this node: + // Number of instances that have reached it + // Sum of y values + // Sum of squared y values + protected DoubleVector nodeStatistics; + + protected Perceptron perceptron; + protected TargetMean targetMean; + protected double learningRatio; + + /* + * Simple setters & getters + */ + public Perceptron getPerceptron() { + return perceptron; + } + + public void setPerceptron(Perceptron perceptron) { + this.perceptron = perceptron; + } + + public TargetMean getTargetMean() { + return targetMean; + } + + public void setTargetMean(TargetMean targetMean) { + this.targetMean = targetMean; + } + + /* + * Create a new RuleRegressionNode + */ + public RuleRegressionNode(double[] initialClassObservations) { + this.nodeStatistics = new DoubleVector(initialClassObservations); + } + + public RuleRegressionNode() { + this(new double[0]); + } + + /* + * Update statistics with input instance + */ + public abstract void updateStatistics(Instance instance); + + /* + * Predictions + */ + public double[] getPrediction(Instance instance) { + int predictionMode = this.getLearnerToUse(this.predictionFunction); + return getPrediction(instance, predictionMode); + } + + public double[] getSimplePrediction() { + if (this.targetMean != null) + return this.targetMean.getVotesForInstance(); + else + return new double[] { 0 }; + } + + public double[] getPrediction(Instance instance, int predictionMode) { + double[] ret; + if (predictionMode == 1) + ret = this.perceptron.getVotesForInstance(instance); + else + ret = this.targetMean.getVotesForInstance(instance); + return ret; + } + + public double getNormalizedPrediction(Instance instance) { + double res; + double[] aux; + switch (this.predictionFunction) { + // perceptron - 1 + case 1: + res = this.perceptron.normalizedPrediction(instance); + break; + // target mean - 2 + case 2: + aux = this.targetMean.getVotesForInstance(); + res = normalize(aux[0]); + break; + // adaptive - 0 + case 0: + int predictionMode = this.getLearnerToUse(0); + if (predictionMode == 1) + { + res = this.perceptron.normalizedPrediction(instance); + } + else { + aux = this.targetMean.getVotesForInstance(instance); + res = normalize(aux[0]); + } + break; + default: + throw new UnsupportedOperationException("Prediction mode not in range."); + } + return res; + } + + /* + * Get learner mode + */ + public int getLearnerToUse(int predMode) { + int predictionMode = predMode; + if (predictionMode == 0) { + double perceptronError = this.perceptron.getCurrentError(); + double meanTargetError = this.targetMean.getCurrentError(); + if (perceptronError < meanTargetError) + predictionMode = 1; // PERCEPTRON + else + predictionMode = 2; // TARGET MEAN + } + return predictionMode; + } + + /* + * Error and change detection + */ + public double computeError(Instance instance) { + double normalizedPrediction = getNormalizedPrediction(instance); + double normalizedClassValue = normalize(instance.classValue()); + return Math.abs(normalizedClassValue - normalizedPrediction); + } + + public double getCurrentError() { + double error; + if (this.perceptron != null) { + if (targetMean == null) + error = perceptron.getCurrentError(); + else { + double errorP = perceptron.getCurrentError(); + double errorTM = targetMean.getCurrentError(); + error = (errorP < errorTM) ? errorP : errorTM; + } + } + else + error = Double.MAX_VALUE; + return error; + } + + /* + * no. of instances seen + */ + public long getInstancesSeen() { + if (nodeStatistics != null) { + return (long) this.nodeStatistics.getValue(0); + } else { + return 0; + } + } + + public DoubleVector getNodeStatistics() { + return this.nodeStatistics; + } + + /* + * Anomaly detection + */ + public boolean isAnomaly(Instance instance, + double uniVariateAnomalyProbabilityThreshold, + double multiVariateAnomalyProbabilityThreshold, + int numberOfInstanceesForAnomaly) { + // AMRUles is equipped with anomaly detection. If on, compute the anomaly + // value. + long perceptronIntancesSeen = this.perceptron.getInstancesSeen(); + if (perceptronIntancesSeen >= numberOfInstanceesForAnomaly) { + double attribSum; + double attribSquaredSum; + double D = 0.0; + double N = 0.0; + double anomaly; + + for (int x = 0; x < instance.numAttributes() - 1; x++) { + // Perceptron is initialized each rule. + // this is a local anomaly. + int instAttIndex = modelAttIndexToInstanceAttIndex(x, instance); + attribSum = this.perceptron.perceptronattributeStatistics.getValue(x); + attribSquaredSum = this.perceptron.squaredperceptronattributeStatistics.getValue(x); + double mean = attribSum / perceptronIntancesSeen; + double sd = computeSD(attribSquaredSum, attribSum, perceptronIntancesSeen); + double probability = computeProbability(mean, sd, instance.value(instAttIndex)); + + if (probability > 0.0) { + D = D + Math.abs(Math.log(probability)); + if (probability < uniVariateAnomalyProbabilityThreshold) {// 0.10 + N = N + Math.abs(Math.log(probability)); + } + } + } + + anomaly = 0.0; + if (D != 0.0) { + anomaly = N / D; + } + if (anomaly >= multiVariateAnomalyProbabilityThreshold) { + // debuganomaly(instance, + // uniVariateAnomalyProbabilityThreshold, + // multiVariateAnomalyProbabilityThreshold, + // anomaly); + return true; + } } + return false; + } + + /* + * Helpers + */ + public static double computeProbability(double mean, double sd, double value) { + double probability = 0.0; + + if (sd > 0.0) { + double k = (Math.abs(value - mean) / sd); // One tailed variant of + // Chebyshev's inequality + probability = 1.0 / (1 + k * k); + } + + return probability; + } + + public static double computeHoeffdingBound(double range, double confidence, double n) { + return Math.sqrt(((range * range) * Math.log(1.0 / confidence)) / (2.0 * n)); + } + + private double normalize(double value) { + double meanY = this.nodeStatistics.getValue(1) / this.nodeStatistics.getValue(0); + double sdY = computeSD(this.nodeStatistics.getValue(2), this.nodeStatistics.getValue(1), + (long) this.nodeStatistics.getValue(0)); + double normalizedY = 0.0; + if (sdY > 0.0000001) { + normalizedY = (value - meanY) / (sdY); + } + return normalizedY; + } + + public double computeSD(double squaredVal, double val, long size) { + if (size > 1) { + return Math.sqrt((squaredVal - ((val * val) / size)) / (size - 1.0)); + } + return 0.0; + } + + /** + * Gets the index of the attribute in the instance, given the index of the + * attribute in the learner. + * + * @param index + * the index of the attribute in the learner + * @param inst + * the instance + * @return the index in the instance + */ + protected static int modelAttIndexToInstanceAttIndex(int index, Instance inst) { + return index <= inst.classIndex() ? index : index + 1; + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java index 28f4890..a89345e 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/RuleSplitNode.java @@ -30,37 +30,39 @@ import com.yahoo.labs.samoa.instances.Instance; * Represent a feature of rules (an element of ruleÅ nodeList). * * @author Anh Thu Vu - * + * */ public class RuleSplitNode extends SplitNode { - protected double lastTargetMean; - protected int operatorObserver; + protected double lastTargetMean; + protected int operatorObserver; + + private static final long serialVersionUID = 1L; + + public InstanceConditionalTest getSplitTest() { + return this.splitTest; + } - private static final long serialVersionUID = 1L; + /** + * Create a new RuleSplitNode + */ + public RuleSplitNode() { + this(null, new double[0]); + } - public InstanceConditionalTest getSplitTest() { - return this.splitTest; - } + public RuleSplitNode(InstanceConditionalTest splitTest, double[] classObservations) { + super(splitTest, classObservations); + } - /** - * Create a new RuleSplitNode - */ - public RuleSplitNode() { - this(null, new double[0]); - } - public RuleSplitNode(InstanceConditionalTest splitTest, double[] classObservations) { - super(splitTest, classObservations); - } - - public RuleSplitNode getACopy() { - InstanceConditionalTest splitTest = new NumericAttributeBinaryRulePredicate((NumericAttributeBinaryRulePredicate) this.getSplitTest()); - return new RuleSplitNode(splitTest, this.getObservedClassDistribution()); - } + public RuleSplitNode getACopy() { + InstanceConditionalTest splitTest = new NumericAttributeBinaryRulePredicate( + (NumericAttributeBinaryRulePredicate) this.getSplitTest()); + return new RuleSplitNode(splitTest, this.getObservedClassDistribution()); + } - public boolean evaluate(Instance instance) { - Predicate predicate = (Predicate) this.splitTest; - return predicate.evaluate(instance); - } + public boolean evaluate(Instance instance) { + Predicate predicate = (Predicate) this.splitTest; + return predicate.evaluate(instance); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java index 902acf0..da00c0d 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/common/TargetMean.java @@ -59,162 +59,164 @@ import com.yahoo.labs.samoa.moa.core.StringUtils; public class TargetMean extends AbstractClassifier implements Regressor { - /** + /** * */ - protected long n; - protected double sum; - protected double errorSum; - protected double nError; - private double fadingErrorFactor; - - private static final long serialVersionUID = 7152547322803559115L; - - public FloatOption fadingErrorFactorOption = new FloatOption( - "fadingErrorFactor", 'e', - "Fading error factor for the TargetMean accumulated error", 0.99, 0, 1); - - @Override - public boolean isRandomizable() { - return false; - } - - @Override - public double[] getVotesForInstance(Instance inst) { - return getVotesForInstance(); - } - - public double[] getVotesForInstance() { - double[] currentMean=new double[1]; - if (n>0) - currentMean[0]=sum/n; - else - currentMean[0]=0; - return currentMean; - } - - @Override - public void resetLearningImpl() { - sum=0; - n=0; - errorSum=Double.MAX_VALUE; - nError=0; - } - - @Override - public void trainOnInstanceImpl(Instance inst) { - updateAccumulatedError(inst); - ++this.n; - this.sum+=inst.classValue(); - } - protected void updateAccumulatedError(Instance inst){ - double mean=0; - nError=1+fadingErrorFactor*nError; - if(n>0) - mean=sum/n; - errorSum=Math.abs(inst.classValue()-mean)+fadingErrorFactor*errorSum; - } - - @Override - protected Measurement[] getModelMeasurementsImpl() { - return null; - } - - @Override - public void getModelDescription(StringBuilder out, int indent) { - StringUtils.appendIndented(out, indent, "Current Mean: " + this.sum/this.n); - StringUtils.appendNewline(out); - - } - /* JD - * Resets the learner but initializes with a starting point - * */ - public void reset(double currentMean, long numberOfInstances) { - this.sum=currentMean*numberOfInstances; - this.n=numberOfInstances; - this.resetError(); - } - - /* JD - * Resets the learner but initializes with a starting point - * */ - public double getCurrentError(){ - if(this.nError>0) - return this.errorSum/this.nError; - else - return Double.MAX_VALUE; - } - - public TargetMean(TargetMean t) { - super(); - this.n = t.n; - this.sum = t.sum; - this.errorSum = t.errorSum; - this.nError = t.nError; - this.fadingErrorFactor = t.fadingErrorFactor; - this.fadingErrorFactorOption = t.fadingErrorFactorOption; - } - - public TargetMean(TargetMeanData td) { - this(); - this.n = td.n; - this.sum = td.sum; - this.errorSum = td.errorSum; - this.nError = td.nError; - this.fadingErrorFactor = td.fadingErrorFactor; - this.fadingErrorFactorOption.setValue(td.fadingErrorFactorOptionValue); - } - - public TargetMean() { - super(); - fadingErrorFactor=fadingErrorFactorOption.getValue(); - } - - public void resetError() { - this.errorSum=0; - this.nError=0; - } - - public static class TargetMeanData { - private long n; - private double sum; - private double errorSum; - private double nError; - private double fadingErrorFactor; - private double fadingErrorFactorOptionValue; - - public TargetMeanData() { - - } - - public TargetMeanData(TargetMean tm) { - this.n = tm.n; - this.sum = tm.sum; - this.errorSum = tm.errorSum; - this.nError = tm.nError; - this.fadingErrorFactor = tm.fadingErrorFactor; - if (tm.fadingErrorFactorOption != null) - this.fadingErrorFactorOptionValue = tm.fadingErrorFactorOption.getValue(); - else - this.fadingErrorFactorOptionValue = 0.99; - } - - public TargetMean build() { - return new TargetMean(this); - } - } - - public static final class TargetMeanSerializer extends Serializer<TargetMean>{ - - @Override - public void write(Kryo kryo, Output output, TargetMean t) { - kryo.writeObjectOrNull(output, new TargetMeanData(t), TargetMeanData.class); - } - - @Override - public TargetMean read(Kryo kryo, Input input, Class<TargetMean> type) { - TargetMeanData data = kryo.readObjectOrNull(input, TargetMeanData.class); - return data.build(); - } - } + protected long n; + protected double sum; + protected double errorSum; + protected double nError; + private double fadingErrorFactor; + + private static final long serialVersionUID = 7152547322803559115L; + + public FloatOption fadingErrorFactorOption = new FloatOption( + "fadingErrorFactor", 'e', + "Fading error factor for the TargetMean accumulated error", 0.99, 0, 1); + + @Override + public boolean isRandomizable() { + return false; + } + + @Override + public double[] getVotesForInstance(Instance inst) { + return getVotesForInstance(); + } + + public double[] getVotesForInstance() { + double[] currentMean = new double[1]; + if (n > 0) + currentMean[0] = sum / n; + else + currentMean[0] = 0; + return currentMean; + } + + @Override + public void resetLearningImpl() { + sum = 0; + n = 0; + errorSum = Double.MAX_VALUE; + nError = 0; + } + + @Override + public void trainOnInstanceImpl(Instance inst) { + updateAccumulatedError(inst); + ++this.n; + this.sum += inst.classValue(); + } + + protected void updateAccumulatedError(Instance inst) { + double mean = 0; + nError = 1 + fadingErrorFactor * nError; + if (n > 0) + mean = sum / n; + errorSum = Math.abs(inst.classValue() - mean) + fadingErrorFactor * errorSum; + } + + @Override + protected Measurement[] getModelMeasurementsImpl() { + return null; + } + + @Override + public void getModelDescription(StringBuilder out, int indent) { + StringUtils.appendIndented(out, indent, "Current Mean: " + this.sum / this.n); + StringUtils.appendNewline(out); + + } + + /* + * JD Resets the learner but initializes with a starting point + */ + public void reset(double currentMean, long numberOfInstances) { + this.sum = currentMean * numberOfInstances; + this.n = numberOfInstances; + this.resetError(); + } + + /* + * JD Resets the learner but initializes with a starting point + */ + public double getCurrentError() { + if (this.nError > 0) + return this.errorSum / this.nError; + else + return Double.MAX_VALUE; + } + + public TargetMean(TargetMean t) { + super(); + this.n = t.n; + this.sum = t.sum; + this.errorSum = t.errorSum; + this.nError = t.nError; + this.fadingErrorFactor = t.fadingErrorFactor; + this.fadingErrorFactorOption = t.fadingErrorFactorOption; + } + + public TargetMean(TargetMeanData td) { + this(); + this.n = td.n; + this.sum = td.sum; + this.errorSum = td.errorSum; + this.nError = td.nError; + this.fadingErrorFactor = td.fadingErrorFactor; + this.fadingErrorFactorOption.setValue(td.fadingErrorFactorOptionValue); + } + + public TargetMean() { + super(); + fadingErrorFactor = fadingErrorFactorOption.getValue(); + } + + public void resetError() { + this.errorSum = 0; + this.nError = 0; + } + + public static class TargetMeanData { + private long n; + private double sum; + private double errorSum; + private double nError; + private double fadingErrorFactor; + private double fadingErrorFactorOptionValue; + + public TargetMeanData() { + + } + + public TargetMeanData(TargetMean tm) { + this.n = tm.n; + this.sum = tm.sum; + this.errorSum = tm.errorSum; + this.nError = tm.nError; + this.fadingErrorFactor = tm.fadingErrorFactor; + if (tm.fadingErrorFactorOption != null) + this.fadingErrorFactorOptionValue = tm.fadingErrorFactorOption.getValue(); + else + this.fadingErrorFactorOptionValue = 0.99; + } + + public TargetMean build() { + return new TargetMean(this); + } + } + + public static final class TargetMeanSerializer extends Serializer<TargetMean> { + + @Override + public void write(Kryo kryo, Output output, TargetMean t) { + kryo.writeObjectOrNull(output, new TargetMeanData(t), TargetMeanData.class); + } + + @Override + public TargetMean read(Kryo kryo, Input input, Class<TargetMean> type) { + TargetMeanData data = kryo.readObjectOrNull(input, TargetMeanData.class); + return data.build(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java index 54a4006..007c535 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java @@ -39,296 +39,300 @@ import com.yahoo.labs.samoa.topology.Stream; * Default Rule Learner Processor (HAMR). * * @author Anh Thu Vu - * + * */ public class AMRDefaultRuleProcessor implements Processor { - /** + /** * */ - private static final long serialVersionUID = 23702084591044447L; - - private static final Logger logger = - LoggerFactory.getLogger(AMRDefaultRuleProcessor.class); - - private int processorId; - - // Default rule - protected transient ActiveRule defaultRule; - protected transient int ruleNumberID; - protected transient double[] statistics; - - // SAMOA Stream - private Stream ruleStream; - private Stream resultStream; - - // Options - protected int pageHinckleyThreshold; - protected double pageHinckleyAlpha; - protected boolean driftDetection; - protected int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2 - protected boolean constantLearningRatioDecay; - protected double learningRatio; - - protected double splitConfidence; - protected double tieThreshold; - protected int gracePeriod; - - protected FIMTDDNumericAttributeClassLimitObserver numericObserver; - - /* - * Constructor - */ - public AMRDefaultRuleProcessor (Builder builder) { - this.pageHinckleyThreshold = builder.pageHinckleyThreshold; - this.pageHinckleyAlpha = builder.pageHinckleyAlpha; - this.driftDetection = builder.driftDetection; - this.predictionFunction = builder.predictionFunction; - this.constantLearningRatioDecay = builder.constantLearningRatioDecay; - this.learningRatio = builder.learningRatio; - this.splitConfidence = builder.splitConfidence; - this.tieThreshold = builder.tieThreshold; - this.gracePeriod = builder.gracePeriod; - - this.numericObserver = builder.numericObserver; - } - - @Override - public boolean process(ContentEvent event) { - InstanceContentEvent instanceEvent = (InstanceContentEvent) event; - // predict - if (instanceEvent.isTesting()) { - this.predictOnInstance(instanceEvent); - } - - // train - if (instanceEvent.isTraining()) { - this.trainOnInstance(instanceEvent); - } - - return false; - } - - /* - * Prediction - */ - private void predictOnInstance (InstanceContentEvent instanceEvent) { - double [] vote=defaultRule.getPrediction(instanceEvent.getInstance()); - ResultContentEvent rce = newResultContentEvent(vote, instanceEvent); - resultStream.put(rce); - } - - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent){ - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), inEvent.getClassId(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - /* - * Training - */ - private void trainOnInstance (InstanceContentEvent instanceEvent) { - this.trainOnInstanceImpl(instanceEvent.getInstance()); - } - public void trainOnInstanceImpl(Instance instance) { - defaultRule.updateStatistics(instance); - if (defaultRule.getInstancesSeen() % this.gracePeriod == 0.0) { - if (defaultRule.tryToExpand(this.splitConfidence, this.tieThreshold) == true) { - ActiveRule newDefaultRule=newRule(defaultRule.getRuleNumberID(),(RuleActiveRegressionNode)defaultRule.getLearningNode(), - ((RuleActiveRegressionNode)defaultRule.getLearningNode()).getStatisticsOtherBranchSplit()); //other branch - defaultRule.split(); - defaultRule.setRuleNumberID(++ruleNumberID); - // send out the new rule - sendAddRuleEvent(defaultRule.getRuleNumberID(), this.defaultRule); - defaultRule=newDefaultRule; - } - } - } - - /* - * Create new rules - */ - private ActiveRule newRule(int ID, RuleActiveRegressionNode node, double[] statistics) { - ActiveRule r=newRule(ID); - - if (node!=null) - { - if(node.getPerceptron()!=null) - { - r.getLearningNode().setPerceptron(new Perceptron(node.getPerceptron())); - r.getLearningNode().getPerceptron().setLearningRatio(this.learningRatio); - } - if (statistics==null) - { - double mean; - if(node.getNodeStatistics().getValue(0)>0){ - mean=node.getNodeStatistics().getValue(1)/node.getNodeStatistics().getValue(0); - r.getLearningNode().getTargetMean().reset(mean, 1); - } - } - } - if (statistics!=null && ((RuleActiveRegressionNode)r.getLearningNode()).getTargetMean()!=null) - { - double mean; - if(statistics[0]>0){ - mean=statistics[1]/statistics[0]; - ((RuleActiveRegressionNode)r.getLearningNode()).getTargetMean().reset(mean, (long)statistics[0]); - } - } - return r; - } - - private ActiveRule newRule(int ID) { - ActiveRule r=new ActiveRule.Builder(). - threshold(this.pageHinckleyThreshold). - alpha(this.pageHinckleyAlpha). - changeDetection(this.driftDetection). - predictionFunction(this.predictionFunction). - statistics(new double[3]). - learningRatio(this.learningRatio). - numericObserver(numericObserver). - id(ID).build(); - return r; - } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.statistics= new double[]{0.0,0,0}; - this.ruleNumberID=0; - this.defaultRule = newRule(++this.ruleNumberID); - } - - /* - * Clone processor - */ - @Override - public Processor newProcessor(Processor p) { - AMRDefaultRuleProcessor oldProcessor = (AMRDefaultRuleProcessor) p; - Builder builder = new Builder(oldProcessor); - AMRDefaultRuleProcessor newProcessor = builder.build(); - newProcessor.resultStream = oldProcessor.resultStream; - newProcessor.ruleStream = oldProcessor.ruleStream; - return newProcessor; - } - - /* - * Send events - */ - private void sendAddRuleEvent(int ruleID, ActiveRule rule) { - RuleContentEvent rce = new RuleContentEvent(ruleID, rule, false); - this.ruleStream.put(rce); - } - - /* - * Output streams - */ - public void setRuleStream(Stream ruleStream) { - this.ruleStream = ruleStream; - } - - public Stream getRuleStream() { - return this.ruleStream; - } - - public void setResultStream(Stream resultStream) { - this.resultStream = resultStream; - } - - public Stream getResultStream() { - return this.resultStream; - } - - /* - * Builder - */ - public static class Builder { - private int pageHinckleyThreshold; - private double pageHinckleyAlpha; - private boolean driftDetection; - private int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2 - private boolean constantLearningRatioDecay; - private double learningRatio; - private double splitConfidence; - private double tieThreshold; - private int gracePeriod; - - private FIMTDDNumericAttributeClassLimitObserver numericObserver; - - private Instances dataset; - - public Builder(Instances dataset){ - this.dataset = dataset; - } - - public Builder(AMRDefaultRuleProcessor processor) { - this.pageHinckleyThreshold = processor.pageHinckleyThreshold; - this.pageHinckleyAlpha = processor.pageHinckleyAlpha; - this.driftDetection = processor.driftDetection; - this.predictionFunction = processor.predictionFunction; - this.constantLearningRatioDecay = processor.constantLearningRatioDecay; - this.learningRatio = processor.learningRatio; - this.splitConfidence = processor.splitConfidence; - this.tieThreshold = processor.tieThreshold; - this.gracePeriod = processor.gracePeriod; - - this.numericObserver = processor.numericObserver; - } - - public Builder threshold(int threshold) { - this.pageHinckleyThreshold = threshold; - return this; - } - - public Builder alpha(double alpha) { - this.pageHinckleyAlpha = alpha; - return this; - } - - public Builder changeDetection(boolean changeDetection) { - this.driftDetection = changeDetection; - return this; - } - - public Builder predictionFunction(int predictionFunction) { - this.predictionFunction = predictionFunction; - return this; - } - - public Builder constantLearningRatioDecay(boolean constantDecay) { - this.constantLearningRatioDecay = constantDecay; - return this; - } - - public Builder learningRatio(double learningRatio) { - this.learningRatio = learningRatio; - return this; - } - - public Builder splitConfidence(double splitConfidence) { - this.splitConfidence = splitConfidence; - return this; - } - - public Builder tieThreshold(double tieThreshold) { - this.tieThreshold = tieThreshold; - return this; - } - - public Builder gracePeriod(int gracePeriod) { - this.gracePeriod = gracePeriod; - return this; - } - - public Builder numericObserver(FIMTDDNumericAttributeClassLimitObserver numericObserver) { - this.numericObserver = numericObserver; - return this; - } - - public AMRDefaultRuleProcessor build() { - return new AMRDefaultRuleProcessor(this); - } - } - + private static final long serialVersionUID = 23702084591044447L; + + private static final Logger logger = + LoggerFactory.getLogger(AMRDefaultRuleProcessor.class); + + private int processorId; + + // Default rule + protected transient ActiveRule defaultRule; + protected transient int ruleNumberID; + protected transient double[] statistics; + + // SAMOA Stream + private Stream ruleStream; + private Stream resultStream; + + // Options + protected int pageHinckleyThreshold; + protected double pageHinckleyAlpha; + protected boolean driftDetection; + protected int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2 + protected boolean constantLearningRatioDecay; + protected double learningRatio; + + protected double splitConfidence; + protected double tieThreshold; + protected int gracePeriod; + + protected FIMTDDNumericAttributeClassLimitObserver numericObserver; + + /* + * Constructor + */ + public AMRDefaultRuleProcessor(Builder builder) { + this.pageHinckleyThreshold = builder.pageHinckleyThreshold; + this.pageHinckleyAlpha = builder.pageHinckleyAlpha; + this.driftDetection = builder.driftDetection; + this.predictionFunction = builder.predictionFunction; + this.constantLearningRatioDecay = builder.constantLearningRatioDecay; + this.learningRatio = builder.learningRatio; + this.splitConfidence = builder.splitConfidence; + this.tieThreshold = builder.tieThreshold; + this.gracePeriod = builder.gracePeriod; + + this.numericObserver = builder.numericObserver; + } + + @Override + public boolean process(ContentEvent event) { + InstanceContentEvent instanceEvent = (InstanceContentEvent) event; + // predict + if (instanceEvent.isTesting()) { + this.predictOnInstance(instanceEvent); + } + + // train + if (instanceEvent.isTraining()) { + this.trainOnInstance(instanceEvent); + } + + return false; + } + + /* + * Prediction + */ + private void predictOnInstance(InstanceContentEvent instanceEvent) { + double[] vote = defaultRule.getPrediction(instanceEvent.getInstance()); + ResultContentEvent rce = newResultContentEvent(vote, instanceEvent); + resultStream.put(rce); + } + + private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), + inEvent.getClassId(), prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + /* + * Training + */ + private void trainOnInstance(InstanceContentEvent instanceEvent) { + this.trainOnInstanceImpl(instanceEvent.getInstance()); + } + + public void trainOnInstanceImpl(Instance instance) { + defaultRule.updateStatistics(instance); + if (defaultRule.getInstancesSeen() % this.gracePeriod == 0.0) { + if (defaultRule.tryToExpand(this.splitConfidence, this.tieThreshold) == true) { + ActiveRule newDefaultRule = newRule(defaultRule.getRuleNumberID(), + (RuleActiveRegressionNode) defaultRule.getLearningNode(), + ((RuleActiveRegressionNode) defaultRule.getLearningNode()).getStatisticsOtherBranchSplit()); // other + // branch + defaultRule.split(); + defaultRule.setRuleNumberID(++ruleNumberID); + // send out the new rule + sendAddRuleEvent(defaultRule.getRuleNumberID(), this.defaultRule); + defaultRule = newDefaultRule; + } + } + } + + /* + * Create new rules + */ + private ActiveRule newRule(int ID, RuleActiveRegressionNode node, double[] statistics) { + ActiveRule r = newRule(ID); + + if (node != null) + { + if (node.getPerceptron() != null) + { + r.getLearningNode().setPerceptron(new Perceptron(node.getPerceptron())); + r.getLearningNode().getPerceptron().setLearningRatio(this.learningRatio); + } + if (statistics == null) + { + double mean; + if (node.getNodeStatistics().getValue(0) > 0) { + mean = node.getNodeStatistics().getValue(1) / node.getNodeStatistics().getValue(0); + r.getLearningNode().getTargetMean().reset(mean, 1); + } + } + } + if (statistics != null && ((RuleActiveRegressionNode) r.getLearningNode()).getTargetMean() != null) + { + double mean; + if (statistics[0] > 0) { + mean = statistics[1] / statistics[0]; + ((RuleActiveRegressionNode) r.getLearningNode()).getTargetMean().reset(mean, (long) statistics[0]); + } + } + return r; + } + + private ActiveRule newRule(int ID) { + ActiveRule r = new ActiveRule.Builder(). + threshold(this.pageHinckleyThreshold). + alpha(this.pageHinckleyAlpha). + changeDetection(this.driftDetection). + predictionFunction(this.predictionFunction). + statistics(new double[3]). + learningRatio(this.learningRatio). + numericObserver(numericObserver). + id(ID).build(); + return r; + } + + @Override + public void onCreate(int id) { + this.processorId = id; + this.statistics = new double[] { 0.0, 0, 0 }; + this.ruleNumberID = 0; + this.defaultRule = newRule(++this.ruleNumberID); + } + + /* + * Clone processor + */ + @Override + public Processor newProcessor(Processor p) { + AMRDefaultRuleProcessor oldProcessor = (AMRDefaultRuleProcessor) p; + Builder builder = new Builder(oldProcessor); + AMRDefaultRuleProcessor newProcessor = builder.build(); + newProcessor.resultStream = oldProcessor.resultStream; + newProcessor.ruleStream = oldProcessor.ruleStream; + return newProcessor; + } + + /* + * Send events + */ + private void sendAddRuleEvent(int ruleID, ActiveRule rule) { + RuleContentEvent rce = new RuleContentEvent(ruleID, rule, false); + this.ruleStream.put(rce); + } + + /* + * Output streams + */ + public void setRuleStream(Stream ruleStream) { + this.ruleStream = ruleStream; + } + + public Stream getRuleStream() { + return this.ruleStream; + } + + public void setResultStream(Stream resultStream) { + this.resultStream = resultStream; + } + + public Stream getResultStream() { + return this.resultStream; + } + + /* + * Builder + */ + public static class Builder { + private int pageHinckleyThreshold; + private double pageHinckleyAlpha; + private boolean driftDetection; + private int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2 + private boolean constantLearningRatioDecay; + private double learningRatio; + private double splitConfidence; + private double tieThreshold; + private int gracePeriod; + + private FIMTDDNumericAttributeClassLimitObserver numericObserver; + + private Instances dataset; + + public Builder(Instances dataset) { + this.dataset = dataset; + } + + public Builder(AMRDefaultRuleProcessor processor) { + this.pageHinckleyThreshold = processor.pageHinckleyThreshold; + this.pageHinckleyAlpha = processor.pageHinckleyAlpha; + this.driftDetection = processor.driftDetection; + this.predictionFunction = processor.predictionFunction; + this.constantLearningRatioDecay = processor.constantLearningRatioDecay; + this.learningRatio = processor.learningRatio; + this.splitConfidence = processor.splitConfidence; + this.tieThreshold = processor.tieThreshold; + this.gracePeriod = processor.gracePeriod; + + this.numericObserver = processor.numericObserver; + } + + public Builder threshold(int threshold) { + this.pageHinckleyThreshold = threshold; + return this; + } + + public Builder alpha(double alpha) { + this.pageHinckleyAlpha = alpha; + return this; + } + + public Builder changeDetection(boolean changeDetection) { + this.driftDetection = changeDetection; + return this; + } + + public Builder predictionFunction(int predictionFunction) { + this.predictionFunction = predictionFunction; + return this; + } + + public Builder constantLearningRatioDecay(boolean constantDecay) { + this.constantLearningRatioDecay = constantDecay; + return this; + } + + public Builder learningRatio(double learningRatio) { + this.learningRatio = learningRatio; + return this; + } + + public Builder splitConfidence(double splitConfidence) { + this.splitConfidence = splitConfidence; + return this; + } + + public Builder tieThreshold(double tieThreshold) { + this.tieThreshold = tieThreshold; + return this; + } + + public Builder gracePeriod(int gracePeriod) { + this.gracePeriod = gracePeriod; + return this; + } + + public Builder numericObserver(FIMTDDNumericAttributeClassLimitObserver numericObserver) { + this.numericObserver = numericObserver; + return this; + } + + public AMRDefaultRuleProcessor build() { + return new AMRDefaultRuleProcessor(this); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java index 8ec118d..9d51075 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRLearnerProcessor.java @@ -42,218 +42,219 @@ import com.yahoo.labs.samoa.topology.Stream; * Learner Processor (HAMR). * * @author Anh Thu Vu - * + * */ public class AMRLearnerProcessor implements Processor { - - /** + + /** * */ - private static final long serialVersionUID = -2302897295090248013L; - - private static final Logger logger = - LoggerFactory.getLogger(AMRLearnerProcessor.class); - - private int processorId; - - private transient List<ActiveRule> ruleSet; - - private Stream outputStream; - - private double splitConfidence; - private double tieThreshold; - private int gracePeriod; - - private boolean noAnomalyDetection; - private double multivariateAnomalyProbabilityThreshold; - private double univariateAnomalyprobabilityThreshold; - private int anomalyNumInstThreshold; - - public AMRLearnerProcessor(Builder builder) { - this.splitConfidence = builder.splitConfidence; - this.tieThreshold = builder.tieThreshold; - this.gracePeriod = builder.gracePeriod; - - this.noAnomalyDetection = builder.noAnomalyDetection; - this.multivariateAnomalyProbabilityThreshold = builder.multivariateAnomalyProbabilityThreshold; - this.univariateAnomalyprobabilityThreshold = builder.univariateAnomalyprobabilityThreshold; - this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold; - } - - @Override - public boolean process(ContentEvent event) { - if (event instanceof AssignmentContentEvent) { - AssignmentContentEvent attrContentEvent = (AssignmentContentEvent) event; - trainRuleOnInstance(attrContentEvent.getRuleNumberID(),attrContentEvent.getInstance()); - } - else if (event instanceof RuleContentEvent) { - RuleContentEvent ruleContentEvent = (RuleContentEvent) event; - if (!ruleContentEvent.isRemoving()) { - addRule(ruleContentEvent.getRule()); - } - } - - return false; - } - - /* - * Process input instances - */ - private void trainRuleOnInstance(int ruleID, Instance instance) { - //System.out.println("Processor:"+this.processorId+": Rule:"+ruleID+" -> Counter="+counter); - Iterator<ActiveRule> ruleIterator= this.ruleSet.iterator(); - while (ruleIterator.hasNext()) { - ActiveRule rule = ruleIterator.next(); - if (rule.getRuleNumberID() == ruleID) { - // Check (again) for coverage - if (rule.isCovering(instance) == true) { - double error = rule.computeError(instance); //Use adaptive mode error - boolean changeDetected = ((RuleActiveRegressionNode)rule.getLearningNode()).updateChangeDetection(error); - if (changeDetected == true) { - ruleIterator.remove(); - - this.sendRemoveRuleEvent(ruleID); - } else { - rule.updateStatistics(instance); - if (rule.getInstancesSeen() % this.gracePeriod == 0.0) { - if (rule.tryToExpand(this.splitConfidence, this.tieThreshold) ) { - rule.split(); - - // expanded: update Aggregator with new/updated predicate - this.sendPredicate(rule.getRuleNumberID(), rule.getLastUpdatedRuleSplitNode(), - (RuleActiveRegressionNode)rule.getLearningNode()); - } - - } - - } - } - - return; - } - } - } - - private boolean isAnomaly(Instance instance, LearningRule rule) { - //AMRUles is equipped with anomaly detection. If on, compute the anomaly value. - boolean isAnomaly = false; - if (this.noAnomalyDetection == false){ - if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) { - isAnomaly = rule.isAnomaly(instance, - this.univariateAnomalyprobabilityThreshold, - this.multivariateAnomalyProbabilityThreshold, - this.anomalyNumInstThreshold); - } - } - return isAnomaly; - } - - private void sendRemoveRuleEvent(int ruleID) { - RuleContentEvent rce = new RuleContentEvent(ruleID, null, true); - this.outputStream.put(rce); - } - - private void sendPredicate(int ruleID, RuleSplitNode splitNode, RuleActiveRegressionNode learningNode) { - this.outputStream.put(new PredicateContentEvent(ruleID, splitNode, new RulePassiveRegressionNode(learningNode))); - } - - /* - * Process control message (regarding adding or removing rules) - */ - private boolean addRule(ActiveRule rule) { - this.ruleSet.add(rule); - return true; - } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.ruleSet = new LinkedList<ActiveRule>(); - } - - @Override - public Processor newProcessor(Processor p) { - AMRLearnerProcessor oldProcessor = (AMRLearnerProcessor)p; - AMRLearnerProcessor newProcessor = - new AMRLearnerProcessor.Builder(oldProcessor).build(); - - newProcessor.setOutputStream(oldProcessor.outputStream); - return newProcessor; - } - - /* - * Builder - */ - public static class Builder { - private double splitConfidence; - private double tieThreshold; - private int gracePeriod; - - private boolean noAnomalyDetection; - private double multivariateAnomalyProbabilityThreshold; - private double univariateAnomalyprobabilityThreshold; - private int anomalyNumInstThreshold; - - private Instances dataset; - - public Builder(Instances dataset){ - this.dataset = dataset; - } - - public Builder(AMRLearnerProcessor processor) { - this.splitConfidence = processor.splitConfidence; - this.tieThreshold = processor.tieThreshold; - this.gracePeriod = processor.gracePeriod; - } - - public Builder splitConfidence(double splitConfidence) { - this.splitConfidence = splitConfidence; - return this; - } - - public Builder tieThreshold(double tieThreshold) { - this.tieThreshold = tieThreshold; - return this; - } - - public Builder gracePeriod(int gracePeriod) { - this.gracePeriod = gracePeriod; - return this; - } - - public Builder noAnomalyDetection(boolean noAnomalyDetection) { - this.noAnomalyDetection = noAnomalyDetection; - return this; - } - - public Builder multivariateAnomalyProbabilityThreshold(double mAnomalyThreshold) { - this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold; - return this; - } - - public Builder univariateAnomalyProbabilityThreshold(double uAnomalyThreshold) { - this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold; - return this; - } - - public Builder anomalyNumberOfInstancesThreshold(int anomalyNumInstThreshold) { - this.anomalyNumInstThreshold = anomalyNumInstThreshold; - return this; - } - - public AMRLearnerProcessor build() { - return new AMRLearnerProcessor(this); - } - } - - /* - * Output stream - */ - public void setOutputStream(Stream stream) { - this.outputStream = stream; - } - - public Stream getOutputStream() { - return this.outputStream; - } + private static final long serialVersionUID = -2302897295090248013L; + + private static final Logger logger = + LoggerFactory.getLogger(AMRLearnerProcessor.class); + + private int processorId; + + private transient List<ActiveRule> ruleSet; + + private Stream outputStream; + + private double splitConfidence; + private double tieThreshold; + private int gracePeriod; + + private boolean noAnomalyDetection; + private double multivariateAnomalyProbabilityThreshold; + private double univariateAnomalyprobabilityThreshold; + private int anomalyNumInstThreshold; + + public AMRLearnerProcessor(Builder builder) { + this.splitConfidence = builder.splitConfidence; + this.tieThreshold = builder.tieThreshold; + this.gracePeriod = builder.gracePeriod; + + this.noAnomalyDetection = builder.noAnomalyDetection; + this.multivariateAnomalyProbabilityThreshold = builder.multivariateAnomalyProbabilityThreshold; + this.univariateAnomalyprobabilityThreshold = builder.univariateAnomalyprobabilityThreshold; + this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold; + } + + @Override + public boolean process(ContentEvent event) { + if (event instanceof AssignmentContentEvent) { + AssignmentContentEvent attrContentEvent = (AssignmentContentEvent) event; + trainRuleOnInstance(attrContentEvent.getRuleNumberID(), attrContentEvent.getInstance()); + } + else if (event instanceof RuleContentEvent) { + RuleContentEvent ruleContentEvent = (RuleContentEvent) event; + if (!ruleContentEvent.isRemoving()) { + addRule(ruleContentEvent.getRule()); + } + } + + return false; + } + + /* + * Process input instances + */ + private void trainRuleOnInstance(int ruleID, Instance instance) { + // System.out.println("Processor:"+this.processorId+": Rule:"+ruleID+" -> Counter="+counter); + Iterator<ActiveRule> ruleIterator = this.ruleSet.iterator(); + while (ruleIterator.hasNext()) { + ActiveRule rule = ruleIterator.next(); + if (rule.getRuleNumberID() == ruleID) { + // Check (again) for coverage + if (rule.isCovering(instance) == true) { + double error = rule.computeError(instance); // Use adaptive mode error + boolean changeDetected = ((RuleActiveRegressionNode) rule.getLearningNode()).updateChangeDetection(error); + if (changeDetected == true) { + ruleIterator.remove(); + + this.sendRemoveRuleEvent(ruleID); + } else { + rule.updateStatistics(instance); + if (rule.getInstancesSeen() % this.gracePeriod == 0.0) { + if (rule.tryToExpand(this.splitConfidence, this.tieThreshold)) { + rule.split(); + + // expanded: update Aggregator with new/updated predicate + this.sendPredicate(rule.getRuleNumberID(), rule.getLastUpdatedRuleSplitNode(), + (RuleActiveRegressionNode) rule.getLearningNode()); + } + + } + + } + } + + return; + } + } + } + + private boolean isAnomaly(Instance instance, LearningRule rule) { + // AMRUles is equipped with anomaly detection. If on, compute the anomaly + // value. + boolean isAnomaly = false; + if (this.noAnomalyDetection == false) { + if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) { + isAnomaly = rule.isAnomaly(instance, + this.univariateAnomalyprobabilityThreshold, + this.multivariateAnomalyProbabilityThreshold, + this.anomalyNumInstThreshold); + } + } + return isAnomaly; + } + + private void sendRemoveRuleEvent(int ruleID) { + RuleContentEvent rce = new RuleContentEvent(ruleID, null, true); + this.outputStream.put(rce); + } + + private void sendPredicate(int ruleID, RuleSplitNode splitNode, RuleActiveRegressionNode learningNode) { + this.outputStream.put(new PredicateContentEvent(ruleID, splitNode, new RulePassiveRegressionNode(learningNode))); + } + + /* + * Process control message (regarding adding or removing rules) + */ + private boolean addRule(ActiveRule rule) { + this.ruleSet.add(rule); + return true; + } + + @Override + public void onCreate(int id) { + this.processorId = id; + this.ruleSet = new LinkedList<ActiveRule>(); + } + + @Override + public Processor newProcessor(Processor p) { + AMRLearnerProcessor oldProcessor = (AMRLearnerProcessor) p; + AMRLearnerProcessor newProcessor = + new AMRLearnerProcessor.Builder(oldProcessor).build(); + + newProcessor.setOutputStream(oldProcessor.outputStream); + return newProcessor; + } + + /* + * Builder + */ + public static class Builder { + private double splitConfidence; + private double tieThreshold; + private int gracePeriod; + + private boolean noAnomalyDetection; + private double multivariateAnomalyProbabilityThreshold; + private double univariateAnomalyprobabilityThreshold; + private int anomalyNumInstThreshold; + + private Instances dataset; + + public Builder(Instances dataset) { + this.dataset = dataset; + } + + public Builder(AMRLearnerProcessor processor) { + this.splitConfidence = processor.splitConfidence; + this.tieThreshold = processor.tieThreshold; + this.gracePeriod = processor.gracePeriod; + } + + public Builder splitConfidence(double splitConfidence) { + this.splitConfidence = splitConfidence; + return this; + } + + public Builder tieThreshold(double tieThreshold) { + this.tieThreshold = tieThreshold; + return this; + } + + public Builder gracePeriod(int gracePeriod) { + this.gracePeriod = gracePeriod; + return this; + } + + public Builder noAnomalyDetection(boolean noAnomalyDetection) { + this.noAnomalyDetection = noAnomalyDetection; + return this; + } + + public Builder multivariateAnomalyProbabilityThreshold(double mAnomalyThreshold) { + this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold; + return this; + } + + public Builder univariateAnomalyProbabilityThreshold(double uAnomalyThreshold) { + this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold; + return this; + } + + public Builder anomalyNumberOfInstancesThreshold(int anomalyNumInstThreshold) { + this.anomalyNumInstThreshold = anomalyNumInstThreshold; + return this; + } + + public AMRLearnerProcessor build() { + return new AMRLearnerProcessor(this); + } + } + + /* + * Output stream + */ + public void setOutputStream(Stream stream) { + this.outputStream = stream; + } + + public Stream getOutputStream() { + return this.outputStream; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java index 38a0be1..88bf375 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java @@ -40,323 +40,333 @@ import org.slf4j.LoggerFactory; /** * Model Aggregator Processor (HAMR). + * * @author Anh Thu Vu - * + * */ public class AMRRuleSetProcessor implements Processor { - /** + /** * */ - private static final long serialVersionUID = -6544096255649379334L; - private static final Logger logger = LoggerFactory.getLogger(AMRRuleSetProcessor.class); - - private int processorId; - - // Rules & default rule - protected transient List<PassiveRule> ruleSet; - - // SAMOA Stream - private Stream statisticsStream; - private Stream resultStream; - private Stream defaultRuleStream; - - // Options - protected boolean noAnomalyDetection; - protected double multivariateAnomalyProbabilityThreshold; - protected double univariateAnomalyprobabilityThreshold; - protected int anomalyNumInstThreshold; - - protected boolean unorderedRules; - - protected int voteType; - - /* - * Constructor - */ - public AMRRuleSetProcessor (Builder builder) { - - this.noAnomalyDetection = builder.noAnomalyDetection; - this.multivariateAnomalyProbabilityThreshold = builder.multivariateAnomalyProbabilityThreshold; - this.univariateAnomalyprobabilityThreshold = builder.univariateAnomalyprobabilityThreshold; - this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold; - this.unorderedRules = builder.unorderedRules; - - this.voteType = builder.voteType; - } - /* (non-Javadoc) - * @see com.yahoo.labs.samoa.core.Processor#process(com.yahoo.labs.samoa.core.ContentEvent) - */ - @Override - public boolean process(ContentEvent event) { - if (event instanceof InstanceContentEvent) { - this.processInstanceEvent((InstanceContentEvent) event); - } - else if (event instanceof PredicateContentEvent) { - PredicateContentEvent pce = (PredicateContentEvent) event; - if (pce.getRuleSplitNode() == null) { - this.updateLearningNode(pce); - } - else { - this.updateRuleSplitNode(pce); - } - } - else if (event instanceof RuleContentEvent) { - RuleContentEvent rce = (RuleContentEvent) event; - if (rce.isRemoving()) { - this.removeRule(rce.getRuleNumberID()); - } - else { - addRule(rce.getRule()); - } - } - return true; - } - - private void processInstanceEvent(InstanceContentEvent instanceEvent) { - Instance instance = instanceEvent.getInstance(); - boolean predictionCovered = false; - boolean trainingCovered = false; - boolean continuePrediction = instanceEvent.isTesting(); - boolean continueTraining = instanceEvent.isTraining(); - - ErrorWeightedVote errorWeightedVote = newErrorWeightedVote(); - for (PassiveRule aRuleSet : this.ruleSet) { - if (!continuePrediction && !continueTraining) - break; - - if (aRuleSet.isCovering(instance)) { - predictionCovered = true; - - if (continuePrediction) { - double[] vote = aRuleSet.getPrediction(instance); - double error = aRuleSet.getCurrentError(); - errorWeightedVote.addVote(vote, error); - if (!this.unorderedRules) continuePrediction = false; - } - - if (continueTraining) { - if (!isAnomaly(instance, aRuleSet)) { - trainingCovered = true; - aRuleSet.updateStatistics(instance); - - // Send instance to statistics PIs - sendInstanceToRule(instance, aRuleSet.getRuleNumberID()); - - if (!this.unorderedRules) continueTraining = false; - } - } - } - } - - if (predictionCovered) { - // Combined prediction - ResultContentEvent rce = newResultContentEvent(errorWeightedVote.computeWeightedVote(), instanceEvent); - resultStream.put(rce); - } - - boolean defaultPrediction = instanceEvent.isTesting() && !predictionCovered; - boolean defaultTraining = instanceEvent.isTraining() && !trainingCovered; - if (defaultPrediction || defaultTraining) { - instanceEvent.setTesting(defaultPrediction); - instanceEvent.setTraining(defaultTraining); - this.defaultRuleStream.put(instanceEvent); - } - } - - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent){ - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), inEvent.getClassId(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - public ErrorWeightedVote newErrorWeightedVote() { - // TODO: do a reset instead of init a new object - if (voteType == 1) - return new UniformWeightedVote(); - return new InverseErrorWeightedVote(); - } - - /** - * Method to verify if the instance is an anomaly. - * @param instance - * @param rule - * @return - */ - private boolean isAnomaly(Instance instance, LearningRule rule) { - //AMRUles is equipped with anomaly detection. If on, compute the anomaly value. - boolean isAnomaly = false; - if (!this.noAnomalyDetection){ - if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) { - isAnomaly = rule.isAnomaly(instance, - this.univariateAnomalyprobabilityThreshold, - this.multivariateAnomalyProbabilityThreshold, - this.anomalyNumInstThreshold); - } - } - return isAnomaly; - } - - /* - * Add predicate/RuleSplitNode for a rule - */ - private void updateRuleSplitNode(PredicateContentEvent pce) { - int ruleID = pce.getRuleNumberID(); - for (PassiveRule rule:ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - rule.nodeListAdd(pce.getRuleSplitNode()); - rule.setLearningNode(pce.getLearningNode()); - } - } - } - - private void updateLearningNode(PredicateContentEvent pce) { - int ruleID = pce.getRuleNumberID(); - for (PassiveRule rule:ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - rule.setLearningNode(pce.getLearningNode()); - } - } - } - - /* - * Add new rule/Remove rule - */ - private boolean addRule(ActiveRule rule) { - this.ruleSet.add(new PassiveRule(rule)); - return true; - } - - private void removeRule(int ruleID) { - for (PassiveRule rule:ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - ruleSet.remove(rule); - break; - } - } - } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.ruleSet = new LinkedList<PassiveRule>(); - - } - - /* - * Clone processor - */ - @Override - public Processor newProcessor(Processor p) { - AMRRuleSetProcessor oldProcessor = (AMRRuleSetProcessor) p; - Builder builder = new Builder(oldProcessor); - AMRRuleSetProcessor newProcessor = builder.build(); - newProcessor.resultStream = oldProcessor.resultStream; - newProcessor.statisticsStream = oldProcessor.statisticsStream; - newProcessor.defaultRuleStream = oldProcessor.defaultRuleStream; - return newProcessor; - } - - /* - * Send events - */ - private void sendInstanceToRule(Instance instance, int ruleID) { - AssignmentContentEvent ace = new AssignmentContentEvent(ruleID, instance); - this.statisticsStream.put(ace); - } - - /* - * Output streams - */ - public void setStatisticsStream(Stream statisticsStream) { - this.statisticsStream = statisticsStream; - } - - public Stream getStatisticsStream() { - return this.statisticsStream; - } - - public void setResultStream(Stream resultStream) { - this.resultStream = resultStream; - } - - public Stream getResultStream() { - return this.resultStream; - } - - public Stream getDefaultRuleStream() { - return this.defaultRuleStream; - } - - public void setDefaultRuleStream(Stream defaultRuleStream) { - this.defaultRuleStream = defaultRuleStream; - } - - /* - * Builder - */ - public static class Builder { - private boolean noAnomalyDetection; - private double multivariateAnomalyProbabilityThreshold; - private double univariateAnomalyprobabilityThreshold; - private int anomalyNumInstThreshold; - - private boolean unorderedRules; - -// private FIMTDDNumericAttributeClassLimitObserver numericObserver; - private int voteType; - - private Instances dataset; - - public Builder(Instances dataset){ - this.dataset = dataset; - } - - public Builder(AMRRuleSetProcessor processor) { - - this.noAnomalyDetection = processor.noAnomalyDetection; - this.multivariateAnomalyProbabilityThreshold = processor.multivariateAnomalyProbabilityThreshold; - this.univariateAnomalyprobabilityThreshold = processor.univariateAnomalyprobabilityThreshold; - this.anomalyNumInstThreshold = processor.anomalyNumInstThreshold; - this.unorderedRules = processor.unorderedRules; - - this.voteType = processor.voteType; - } - - public Builder noAnomalyDetection(boolean noAnomalyDetection) { - this.noAnomalyDetection = noAnomalyDetection; - return this; - } - - public Builder multivariateAnomalyProbabilityThreshold(double mAnomalyThreshold) { - this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold; - return this; - } - - public Builder univariateAnomalyProbabilityThreshold(double uAnomalyThreshold) { - this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold; - return this; - } - - public Builder anomalyNumberOfInstancesThreshold(int anomalyNumInstThreshold) { - this.anomalyNumInstThreshold = anomalyNumInstThreshold; - return this; - } - - public Builder unorderedRules(boolean unorderedRules) { - this.unorderedRules = unorderedRules; - return this; - } - - public Builder voteType(int voteType) { - this.voteType = voteType; - return this; - } - - public AMRRuleSetProcessor build() { - return new AMRRuleSetProcessor(this); - } - } + private static final long serialVersionUID = -6544096255649379334L; + private static final Logger logger = LoggerFactory.getLogger(AMRRuleSetProcessor.class); + + private int processorId; + + // Rules & default rule + protected transient List<PassiveRule> ruleSet; + + // SAMOA Stream + private Stream statisticsStream; + private Stream resultStream; + private Stream defaultRuleStream; + + // Options + protected boolean noAnomalyDetection; + protected double multivariateAnomalyProbabilityThreshold; + protected double univariateAnomalyprobabilityThreshold; + protected int anomalyNumInstThreshold; + + protected boolean unorderedRules; + + protected int voteType; + + /* + * Constructor + */ + public AMRRuleSetProcessor(Builder builder) { + + this.noAnomalyDetection = builder.noAnomalyDetection; + this.multivariateAnomalyProbabilityThreshold = builder.multivariateAnomalyProbabilityThreshold; + this.univariateAnomalyprobabilityThreshold = builder.univariateAnomalyprobabilityThreshold; + this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold; + this.unorderedRules = builder.unorderedRules; + + this.voteType = builder.voteType; + } + + /* + * (non-Javadoc) + * + * @see com.yahoo.labs.samoa.core.Processor#process(com.yahoo.labs.samoa.core. + * ContentEvent) + */ + @Override + public boolean process(ContentEvent event) { + if (event instanceof InstanceContentEvent) { + this.processInstanceEvent((InstanceContentEvent) event); + } + else if (event instanceof PredicateContentEvent) { + PredicateContentEvent pce = (PredicateContentEvent) event; + if (pce.getRuleSplitNode() == null) { + this.updateLearningNode(pce); + } + else { + this.updateRuleSplitNode(pce); + } + } + else if (event instanceof RuleContentEvent) { + RuleContentEvent rce = (RuleContentEvent) event; + if (rce.isRemoving()) { + this.removeRule(rce.getRuleNumberID()); + } + else { + addRule(rce.getRule()); + } + } + return true; + } + + private void processInstanceEvent(InstanceContentEvent instanceEvent) { + Instance instance = instanceEvent.getInstance(); + boolean predictionCovered = false; + boolean trainingCovered = false; + boolean continuePrediction = instanceEvent.isTesting(); + boolean continueTraining = instanceEvent.isTraining(); + + ErrorWeightedVote errorWeightedVote = newErrorWeightedVote(); + for (PassiveRule aRuleSet : this.ruleSet) { + if (!continuePrediction && !continueTraining) + break; + + if (aRuleSet.isCovering(instance)) { + predictionCovered = true; + + if (continuePrediction) { + double[] vote = aRuleSet.getPrediction(instance); + double error = aRuleSet.getCurrentError(); + errorWeightedVote.addVote(vote, error); + if (!this.unorderedRules) + continuePrediction = false; + } + + if (continueTraining) { + if (!isAnomaly(instance, aRuleSet)) { + trainingCovered = true; + aRuleSet.updateStatistics(instance); + + // Send instance to statistics PIs + sendInstanceToRule(instance, aRuleSet.getRuleNumberID()); + + if (!this.unorderedRules) + continueTraining = false; + } + } + } + } + + if (predictionCovered) { + // Combined prediction + ResultContentEvent rce = newResultContentEvent(errorWeightedVote.computeWeightedVote(), instanceEvent); + resultStream.put(rce); + } + + boolean defaultPrediction = instanceEvent.isTesting() && !predictionCovered; + boolean defaultTraining = instanceEvent.isTraining() && !trainingCovered; + if (defaultPrediction || defaultTraining) { + instanceEvent.setTesting(defaultPrediction); + instanceEvent.setTraining(defaultTraining); + this.defaultRuleStream.put(instanceEvent); + } + } + + private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), + inEvent.getClassId(), prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + public ErrorWeightedVote newErrorWeightedVote() { + // TODO: do a reset instead of init a new object + if (voteType == 1) + return new UniformWeightedVote(); + return new InverseErrorWeightedVote(); + } + + /** + * Method to verify if the instance is an anomaly. + * + * @param instance + * @param rule + * @return + */ + private boolean isAnomaly(Instance instance, LearningRule rule) { + // AMRUles is equipped with anomaly detection. If on, compute the anomaly + // value. + boolean isAnomaly = false; + if (!this.noAnomalyDetection) { + if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) { + isAnomaly = rule.isAnomaly(instance, + this.univariateAnomalyprobabilityThreshold, + this.multivariateAnomalyProbabilityThreshold, + this.anomalyNumInstThreshold); + } + } + return isAnomaly; + } + + /* + * Add predicate/RuleSplitNode for a rule + */ + private void updateRuleSplitNode(PredicateContentEvent pce) { + int ruleID = pce.getRuleNumberID(); + for (PassiveRule rule : ruleSet) { + if (rule.getRuleNumberID() == ruleID) { + rule.nodeListAdd(pce.getRuleSplitNode()); + rule.setLearningNode(pce.getLearningNode()); + } + } + } + + private void updateLearningNode(PredicateContentEvent pce) { + int ruleID = pce.getRuleNumberID(); + for (PassiveRule rule : ruleSet) { + if (rule.getRuleNumberID() == ruleID) { + rule.setLearningNode(pce.getLearningNode()); + } + } + } + + /* + * Add new rule/Remove rule + */ + private boolean addRule(ActiveRule rule) { + this.ruleSet.add(new PassiveRule(rule)); + return true; + } + + private void removeRule(int ruleID) { + for (PassiveRule rule : ruleSet) { + if (rule.getRuleNumberID() == ruleID) { + ruleSet.remove(rule); + break; + } + } + } + + @Override + public void onCreate(int id) { + this.processorId = id; + this.ruleSet = new LinkedList<PassiveRule>(); + + } + + /* + * Clone processor + */ + @Override + public Processor newProcessor(Processor p) { + AMRRuleSetProcessor oldProcessor = (AMRRuleSetProcessor) p; + Builder builder = new Builder(oldProcessor); + AMRRuleSetProcessor newProcessor = builder.build(); + newProcessor.resultStream = oldProcessor.resultStream; + newProcessor.statisticsStream = oldProcessor.statisticsStream; + newProcessor.defaultRuleStream = oldProcessor.defaultRuleStream; + return newProcessor; + } + + /* + * Send events + */ + private void sendInstanceToRule(Instance instance, int ruleID) { + AssignmentContentEvent ace = new AssignmentContentEvent(ruleID, instance); + this.statisticsStream.put(ace); + } + + /* + * Output streams + */ + public void setStatisticsStream(Stream statisticsStream) { + this.statisticsStream = statisticsStream; + } + + public Stream getStatisticsStream() { + return this.statisticsStream; + } + + public void setResultStream(Stream resultStream) { + this.resultStream = resultStream; + } + + public Stream getResultStream() { + return this.resultStream; + } + + public Stream getDefaultRuleStream() { + return this.defaultRuleStream; + } + + public void setDefaultRuleStream(Stream defaultRuleStream) { + this.defaultRuleStream = defaultRuleStream; + } + + /* + * Builder + */ + public static class Builder { + private boolean noAnomalyDetection; + private double multivariateAnomalyProbabilityThreshold; + private double univariateAnomalyprobabilityThreshold; + private int anomalyNumInstThreshold; + + private boolean unorderedRules; + + // private FIMTDDNumericAttributeClassLimitObserver numericObserver; + private int voteType; + + private Instances dataset; + + public Builder(Instances dataset) { + this.dataset = dataset; + } + + public Builder(AMRRuleSetProcessor processor) { + + this.noAnomalyDetection = processor.noAnomalyDetection; + this.multivariateAnomalyProbabilityThreshold = processor.multivariateAnomalyProbabilityThreshold; + this.univariateAnomalyprobabilityThreshold = processor.univariateAnomalyprobabilityThreshold; + this.anomalyNumInstThreshold = processor.anomalyNumInstThreshold; + this.unorderedRules = processor.unorderedRules; + + this.voteType = processor.voteType; + } + + public Builder noAnomalyDetection(boolean noAnomalyDetection) { + this.noAnomalyDetection = noAnomalyDetection; + return this; + } + + public Builder multivariateAnomalyProbabilityThreshold(double mAnomalyThreshold) { + this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold; + return this; + } + + public Builder univariateAnomalyProbabilityThreshold(double uAnomalyThreshold) { + this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold; + return this; + } + + public Builder anomalyNumberOfInstancesThreshold(int anomalyNumInstThreshold) { + this.anomalyNumInstThreshold = anomalyNumInstThreshold; + return this; + } + + public Builder unorderedRules(boolean unorderedRules) { + this.unorderedRules = unorderedRules; + return this; + } + + public Builder voteType(int voteType) { + this.voteType = voteType; + return this; + } + + public AMRRuleSetProcessor build() { + return new AMRRuleSetProcessor(this); + } + } }
