http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java new file mode 100644 index 0000000..9976599 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java @@ -0,0 +1,224 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Attribute Content Event represents the instances that split vertically based on their attribute + * + * @author Arinto Murdopo + * + */ +public final class AttributeContentEvent implements ContentEvent { + + private static final long serialVersionUID = 6652815649846676832L; + + private final long learningNodeId; + private final int obsIndex; + private final double attrVal; + private final int classVal; + private final double weight; + private final transient String key; + private final boolean isNominal; + + public AttributeContentEvent() { + learningNodeId = -1; + obsIndex = -1; + attrVal = 0.0; + classVal = -1; + weight = 0.0; + key = ""; + isNominal = true; + } + + private AttributeContentEvent(Builder builder) { + this.learningNodeId = builder.learningNodeId; + this.obsIndex = builder.obsIndex; + this.attrVal = builder.attrVal; + this.classVal = builder.classVal; + this.weight = builder.weight; + this.isNominal = builder.isNominal; + this.key = builder.key; + } + + @Override + public String getKey() { + return this.key; + } + + @Override + public void setKey(String str) { + // do nothing, maybe useful when we want to reuse the object for + // serialization/deserialization purpose + } + + @Override + public boolean isLastEvent() { + return false; + } + + long getLearningNodeId() { + return this.learningNodeId; + } + + int getObsIndex() { + return this.obsIndex; + } + + int getClassVal() { + return this.classVal; + } + + double getAttrVal() { + return this.attrVal; + } + + double getWeight() { + return this.weight; + } + + boolean isNominal() { + return this.isNominal; + } + + static final class Builder { + + // required parameters + private final long learningNodeId; + private final int obsIndex; + private final String key; + + // optional parameters + private double attrVal = 0.0; + private int classVal = 0; + private double weight = 0.0; + private boolean isNominal = false; + + Builder(long id, int obsIndex, String key) { + this.learningNodeId = id; + this.obsIndex = obsIndex; + this.key = key; + } + + private Builder(long id, int obsIndex) { + this.learningNodeId = id; + this.obsIndex = obsIndex; + this.key = ""; + } + + Builder attrValue(double val) { + this.attrVal = val; + return this; + } + + Builder classValue(int val) { + this.classVal = val; + return this; + } + + Builder weight(double val) { + this.weight = val; + return this; + } + + Builder isNominal(boolean val) { + this.isNominal = val; + return this; + } + + AttributeContentEvent build() { + return new AttributeContentEvent(this); + } + } + + /** + * The Kryo serializer class for AttributeContentEvent when executing on top of Storm. This class allow us to change + * the precision of the statistics. + * + * @author Arinto Murdopo + * + */ + public static final class AttributeCESerializer extends Serializer<AttributeContentEvent> { + + private static double PRECISION = 1000000.0; + + @Override + public void write(Kryo kryo, Output output, AttributeContentEvent event) { + output.writeLong(event.learningNodeId, true); + output.writeInt(event.obsIndex, true); + output.writeDouble(event.attrVal, PRECISION, true); + output.writeInt(event.classVal, true); + output.writeDouble(event.weight, PRECISION, true); + output.writeBoolean(event.isNominal); + } + + @Override + public AttributeContentEvent read(Kryo kryo, Input input, + Class<AttributeContentEvent> type) { + AttributeContentEvent ace = new AttributeContentEvent.Builder(input.readLong(true), input.readInt(true)) + .attrValue(input.readDouble(PRECISION, true)) + .classValue(input.readInt(true)) + .weight(input.readDouble(PRECISION, true)) + .isNominal(input.readBoolean()) + .build(); + return ace; + } + } + + /** + * The Kryo serializer class for AttributeContentEvent when executing on top of Storm with full precision of the + * statistics. + * + * @author Arinto Murdopo + * + */ + public static final class AttributeCEFullPrecSerializer extends Serializer<AttributeContentEvent> { + + @Override + public void write(Kryo kryo, Output output, AttributeContentEvent event) { + output.writeLong(event.learningNodeId, true); + output.writeInt(event.obsIndex, true); + output.writeDouble(event.attrVal); + output.writeInt(event.classVal, true); + output.writeDouble(event.weight); + output.writeBoolean(event.isNominal); + } + + @Override + public AttributeContentEvent read(Kryo kryo, Input input, + Class<AttributeContentEvent> type) { + AttributeContentEvent ace = new AttributeContentEvent.Builder(input.readLong(true), input.readInt(true)) + .attrValue(input.readDouble()) + .classValue(input.readInt(true)) + .weight(input.readDouble()) + .isNominal(input.readBoolean()) + .build(); + return ace; + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java new file mode 100644 index 0000000..fe56cc1 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java @@ -0,0 +1,145 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Compute content event is the message that is sent by Model Aggregator Processor to request Local Statistic PI to + * start the local statistic calculation for splitting + * + * @author Arinto Murdopo + * + */ +public final class ComputeContentEvent extends ControlContentEvent { + + private static final long serialVersionUID = 5590798490073395190L; + + private final double[] preSplitDist; + private final long splitId; + + public ComputeContentEvent() { + super(-1); + preSplitDist = null; + splitId = -1; + } + + ComputeContentEvent(long splitId, long id, double[] preSplitDist) { + super(id); + // this.preSplitDist = Arrays.copyOf(preSplitDist, preSplitDist.length); + this.preSplitDist = preSplitDist; + this.splitId = splitId; + } + + @Override + LocStatControl getType() { + return LocStatControl.COMPUTE; + } + + double[] getPreSplitDist() { + return this.preSplitDist; + } + + long getSplitId() { + return this.splitId; + } + + /** + * The Kryo serializer class for ComputeContentEevent when executing on top of Storm. This class allow us to change + * the precision of the statistics. + * + * @author Arinto Murdopo + * + */ + public static final class ComputeCESerializer extends Serializer<ComputeContentEvent> { + + private static double PRECISION = 1000000.0; + + @Override + public void write(Kryo kryo, Output output, ComputeContentEvent object) { + output.writeLong(object.splitId, true); + output.writeLong(object.learningNodeId, true); + + output.writeInt(object.preSplitDist.length, true); + for (int i = 0; i < object.preSplitDist.length; i++) { + output.writeDouble(object.preSplitDist[i], PRECISION, true); + } + } + + @Override + public ComputeContentEvent read(Kryo kryo, Input input, + Class<ComputeContentEvent> type) { + long splitId = input.readLong(true); + long learningNodeId = input.readLong(true); + + int dataLength = input.readInt(true); + double[] preSplitDist = new double[dataLength]; + + for (int i = 0; i < dataLength; i++) { + preSplitDist[i] = input.readDouble(PRECISION, true); + } + + return new ComputeContentEvent(splitId, learningNodeId, preSplitDist); + } + } + + /** + * The Kryo serializer class for ComputeContentEevent when executing on top of Storm with full precision of the + * statistics. + * + * @author Arinto Murdopo + * + */ + public static final class ComputeCEFullPrecSerializer extends Serializer<ComputeContentEvent> { + + @Override + public void write(Kryo kryo, Output output, ComputeContentEvent object) { + output.writeLong(object.splitId, true); + output.writeLong(object.learningNodeId, true); + + output.writeInt(object.preSplitDist.length, true); + for (int i = 0; i < object.preSplitDist.length; i++) { + output.writeDouble(object.preSplitDist[i]); + } + } + + @Override + public ComputeContentEvent read(Kryo kryo, Input input, + Class<ComputeContentEvent> type) { + long splitId = input.readLong(true); + long learningNodeId = input.readLong(true); + + int dataLength = input.readInt(true); + double[] preSplitDist = new double[dataLength]; + + for (int i = 0; i < dataLength; i++) { + preSplitDist[i] = input.readDouble(); + } + + return new ComputeContentEvent(splitId, learningNodeId, preSplitDist); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java new file mode 100644 index 0000000..18d5f06 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java @@ -0,0 +1,72 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; + +/** + * Abstract class to represent ContentEvent to control Local Statistic Processor. + * + * @author Arinto Murdopo + * + */ +abstract class ControlContentEvent implements ContentEvent { + + /** + * + */ + private static final long serialVersionUID = 5837375639629708363L; + + protected final long learningNodeId; + + public ControlContentEvent() { + this.learningNodeId = -1; + } + + ControlContentEvent(long id) { + this.learningNodeId = id; + } + + @Override + public final String getKey() { + return null; + } + + @Override + public void setKey(String str) { + // Do nothing + } + + @Override + public boolean isLastEvent() { + return false; + } + + final long getLearningNodeId() { + return this.learningNodeId; + } + + abstract LocStatControl getType(); + + static enum LocStatControl { + COMPUTE, DELETE + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java new file mode 100644 index 0000000..a834f2f --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java @@ -0,0 +1,47 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Delete Content Event is the content event that is sent by Model Aggregator Processor to delete unnecessary statistic + * in Local Statistic Processor. + * + * @author Arinto Murdopo + * + */ +final class DeleteContentEvent extends ControlContentEvent { + + private static final long serialVersionUID = -2105250722560863633L; + + public DeleteContentEvent() { + super(-1); + } + + DeleteContentEvent(long id) { + super(id); + } + + @Override + LocStatControl getType() { + return LocStatControl.DELETE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java new file mode 100644 index 0000000..0af8b93 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java @@ -0,0 +1,191 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.apache.samoa.learners.InstancesContentEvent; +import org.apache.samoa.learners.ResultContentEvent; +import org.apache.samoa.topology.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +/** + * Filter Processor that stores and filters the instances before sending them to the Model Aggregator Processor. + * + * @author Arinto Murdopo + * + */ +final class FilterProcessor implements Processor { + + private static final long serialVersionUID = -1685875718300564885L; + private static final Logger logger = LoggerFactory.getLogger(FilterProcessor.class); + + private int processorId; + + private final Instances dataset; + private InstancesHeader modelContext; + + // available streams + private Stream outputStream; + + // private constructor based on Builder pattern + private FilterProcessor(Builder builder) { + this.dataset = builder.dataset; + this.batchSize = builder.batchSize; + this.delay = builder.delay; + } + + private int waitingInstances = 0; + + private int delay = 0; + + private int batchSize = 200; + + private List<InstanceContentEvent> contentEventList = new LinkedList<InstanceContentEvent>(); + + @Override + public boolean process(ContentEvent event) { + // Receive a new instance from source + if (event instanceof InstanceContentEvent) { + InstanceContentEvent instanceContentEvent = (InstanceContentEvent) event; + this.contentEventList.add(instanceContentEvent); + this.waitingInstances++; + if (this.waitingInstances == this.batchSize || instanceContentEvent.isLastEvent()) { + // Send Instances + InstancesContentEvent outputEvent = new InstancesContentEvent(instanceContentEvent); + boolean isLastEvent = false; + while (!this.contentEventList.isEmpty()) { + InstanceContentEvent ice = this.contentEventList.remove(0); + Instance inst = ice.getInstance(); + outputEvent.add(inst); + if (!isLastEvent) { + isLastEvent = ice.isLastEvent(); + } + } + outputEvent.setLast(isLastEvent); + this.waitingInstances = 0; + this.outputStream.put(outputEvent); + if (this.delay > 0) { + try { + Thread.sleep(this.delay); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + } + return false; + } + + @Override + public void onCreate(int id) { + this.processorId = id; + this.waitingInstances = 0; + + } + + @Override + public Processor newProcessor(Processor p) { + FilterProcessor oldProcessor = (FilterProcessor) p; + FilterProcessor newProcessor = + new FilterProcessor.Builder(oldProcessor).build(); + + newProcessor.setOutputStream(oldProcessor.outputStream); + return newProcessor; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(super.toString()); + return sb.toString(); + } + + void setOutputStream(Stream outputStream) { + this.outputStream = outputStream; + } + + /** + * Helper method to generate new ResultContentEvent based on an instance and its prediction result. + * + * @param prediction + * The predicted class label from the decision tree model. + * @param inEvent + * The associated instance content event + * @return ResultContentEvent to be sent into Evaluator PI or other destination PI. + */ + private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), + inEvent.getClassId(), prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + /** + * Builder class to replace constructors with many parameters + * + * @author Arinto Murdopo + * + */ + static class Builder { + + // required parameters + private final Instances dataset; + + private int delay = 0; + + private int batchSize = 200; + + Builder(Instances dataset) { + this.dataset = dataset; + } + + Builder(FilterProcessor oldProcessor) { + this.dataset = oldProcessor.dataset; + this.delay = oldProcessor.delay; + this.batchSize = oldProcessor.batchSize; + } + + public Builder delay(int delay) { + this.delay = delay; + return this; + } + + public Builder batchSize(int val) { + this.batchSize = val; + return this; + } + + FilterProcessor build() { + return new FilterProcessor(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java new file mode 100644 index 0000000..c8522c8 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java @@ -0,0 +1,77 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Class that represents the necessary data structure of the node where an instance is routed/filtered through the + * decision tree model. + * + * @author Arinto Murdopo + * + */ +final class FoundNode implements java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = -637695387934143293L; + + private final Node node; + private final SplitNode parent; + private final int parentBranch; + + FoundNode(Node node, SplitNode splitNode, int parentBranch) { + this.node = node; + this.parent = splitNode; + this.parentBranch = parentBranch; + } + + /** + * Method to get the node where an instance is routed/filtered through the decision tree model for testing and + * training. + * + * @return The node where the instance is routed/filtered + */ + Node getNode() { + return this.node; + } + + /** + * Method to get the parent of the node where an instance is routed/filtered through the decision tree model for + * testing and training + * + * @return The parent of the node + */ + SplitNode getParent() { + return this.parent; + } + + /** + * Method to get the index of the node (where an instance is routed/filtered through the decision tree model for + * testing and training) in its parent. + * + * @return The index of the node in its parent node. + */ + int getParentBranch() { + return this.parentBranch; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java new file mode 100644 index 0000000..e4df577 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java @@ -0,0 +1,54 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.instances.Instance; + +/** + * Class that represents inactive learning node. Inactive learning node is a node which only keeps track of the observed + * class distribution. It does not store the statistic for splitting the node. + * + * @author Arinto Murdopo + * + */ +final class InactiveLearningNode extends LearningNode { + + /** + * + */ + private static final long serialVersionUID = -814552382883472302L; + + InactiveLearningNode(double[] initialClassObservation) { + super(initialClassObservation); + } + + @Override + void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) { + this.observedClassDistribution.addToValue( + (int) inst.classValue(), inst.weight()); + } + + @Override + double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) { + return this.observedClassDistribution.getArrayCopy(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java new file mode 100644 index 0000000..9b0480c --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java @@ -0,0 +1,59 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.instances.Instance; + +/** + * Abstract class that represents a learning node + * + * @author Arinto Murdopo + * + */ +abstract class LearningNode extends Node { + + private static final long serialVersionUID = 7157319356146764960L; + + protected LearningNode(double[] classObservation) { + super(classObservation); + } + + /** + * Method to process the instance for learning + * + * @param inst + * The processed instance + * @param proc + * The model aggregator processor where this learning node exists + */ + abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor proc); + + @Override + protected boolean isLeaf() { + return true; + } + + @Override + protected FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, + int parentBranch) { + return new FoundNode(this, parent, parentBranch); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java new file mode 100644 index 0000000..fc9f39c --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java @@ -0,0 +1,95 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion; + +/** + * Local Result Content Event is the content event that represents local calculation of statistic in Local Statistic + * Processor. + * + * @author Arinto Murdopo + * + */ +final class LocalResultContentEvent implements ContentEvent { + + private static final long serialVersionUID = -4206620993777418571L; + + private final AttributeSplitSuggestion bestSuggestion; + private final AttributeSplitSuggestion secondBestSuggestion; + private final long splitId; + + public LocalResultContentEvent() { + bestSuggestion = null; + secondBestSuggestion = null; + splitId = -1; + } + + LocalResultContentEvent(long splitId, AttributeSplitSuggestion best, AttributeSplitSuggestion secondBest) { + this.splitId = splitId; + this.bestSuggestion = best; + this.secondBestSuggestion = secondBest; + } + + @Override + public String getKey() { + return null; + } + + /** + * Method to return the best attribute split suggestion from this local statistic calculation. + * + * @return The best attribute split suggestion. + */ + AttributeSplitSuggestion getBestSuggestion() { + return this.bestSuggestion; + } + + /** + * Method to return the second best attribute split suggestion from this local statistic calculation. + * + * @return The second best attribute split suggestion. + */ + AttributeSplitSuggestion getSecondBestSuggestion() { + return this.secondBestSuggestion; + } + + /** + * Method to get the split ID of this local statistic calculation result + * + * @return The split id of this local calculation result + */ + long getSplitId() { + return this.splitId; + } + + @Override + public void setKey(String str) { + // do nothing + + } + + @Override + public boolean isLastEvent() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java new file mode 100644 index 0000000..7ce46ec --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java @@ -0,0 +1,242 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Vector; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion; +import org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver; +import org.apache.samoa.moa.classifiers.core.attributeclassobservers.GaussianNumericAttributeClassObserver; +import org.apache.samoa.moa.classifiers.core.attributeclassobservers.NominalAttributeClassObserver; +import org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion; +import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion; +import org.apache.samoa.topology.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; + +/** + * Local Statistic Processor contains the local statistic of a subset of the attributes. + * + * @author Arinto Murdopo + * + */ +final class LocalStatisticsProcessor implements Processor { + + /** + * + */ + private static final long serialVersionUID = -3967695130634517631L; + private static Logger logger = LoggerFactory.getLogger(LocalStatisticsProcessor.class); + + // Collection of AttributeObservers, for each ActiveLearningNode and + // AttributeId + private Table<Long, Integer, AttributeClassObserver> localStats; + + private Stream computationResultStream; + + private final SplitCriterion splitCriterion; + private final boolean binarySplit; + private final AttributeClassObserver nominalClassObserver; + private final AttributeClassObserver numericClassObserver; + + // the two observer classes below are also needed to be setup from the Tree + private LocalStatisticsProcessor(Builder builder) { + this.splitCriterion = builder.splitCriterion; + this.binarySplit = builder.binarySplit; + this.nominalClassObserver = builder.nominalClassObserver; + this.numericClassObserver = builder.numericClassObserver; + } + + @Override + public boolean process(ContentEvent event) { + // process AttributeContentEvent by updating the subset of local statistics + if (event instanceof AttributeBatchContentEvent) { + AttributeBatchContentEvent abce = (AttributeBatchContentEvent) event; + List<ContentEvent> contentEventList = abce.getContentEventList(); + for (ContentEvent contentEvent : contentEventList) { + AttributeContentEvent ace = (AttributeContentEvent) contentEvent; + Long learningNodeId = ace.getLearningNodeId(); + Integer obsIndex = ace.getObsIndex(); + + AttributeClassObserver obs = localStats.get( + learningNodeId, obsIndex); + + if (obs == null) { + obs = ace.isNominal() ? newNominalClassObserver() + : newNumericClassObserver(); + localStats.put(ace.getLearningNodeId(), obsIndex, obs); + } + obs.observeAttributeClass(ace.getAttrVal(), ace.getClassVal(), + ace.getWeight()); + } + + /* + * if (event instanceof AttributeContentEvent) { AttributeContentEvent ace + * = (AttributeContentEvent) event; Long learningNodeId = + * Long.valueOf(ace.getLearningNodeId()); Integer obsIndex = + * Integer.valueOf(ace.getObsIndex()); + * + * AttributeClassObserver obs = localStats.get( learningNodeId, obsIndex); + * + * if (obs == null) { obs = ace.isNominal() ? newNominalClassObserver() : + * newNumericClassObserver(); localStats.put(ace.getLearningNodeId(), + * obsIndex, obs); } obs.observeAttributeClass(ace.getAttrVal(), + * ace.getClassVal(), ace.getWeight()); + */ + } else if (event instanceof ComputeContentEvent) { + // process ComputeContentEvent by calculating the local statistic + // and send back the calculation results via computation result stream. + ComputeContentEvent cce = (ComputeContentEvent) event; + Long learningNodeId = cce.getLearningNodeId(); + double[] preSplitDist = cce.getPreSplitDist(); + + Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats + .row(learningNodeId); + List<AttributeSplitSuggestion> suggestions = new Vector<>(); + + for (Entry<Integer, AttributeClassObserver> entry : learningNodeRowMap.entrySet()) { + AttributeClassObserver obs = entry.getValue(); + AttributeSplitSuggestion suggestion = obs + .getBestEvaluatedSplitSuggestion(splitCriterion, + preSplitDist, entry.getKey(), binarySplit); + if (suggestion != null) { + suggestions.add(suggestion); + } + } + + AttributeSplitSuggestion[] bestSuggestions = suggestions + .toArray(new AttributeSplitSuggestion[suggestions.size()]); + + Arrays.sort(bestSuggestions); + + AttributeSplitSuggestion bestSuggestion = null; + AttributeSplitSuggestion secondBestSuggestion = null; + + if (bestSuggestions.length >= 1) { + bestSuggestion = bestSuggestions[bestSuggestions.length - 1]; + + if (bestSuggestions.length >= 2) { + secondBestSuggestion = bestSuggestions[bestSuggestions.length - 2]; + } + } + + // create the local result content event + LocalResultContentEvent lcre = + new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, secondBestSuggestion); + computationResultStream.put(lcre); + logger.debug("Finish compute event"); + } else if (event instanceof DeleteContentEvent) { + DeleteContentEvent dce = (DeleteContentEvent) event; + Long learningNodeId = dce.getLearningNodeId(); + localStats.rowMap().remove(learningNodeId); + } + return false; + } + + @Override + public void onCreate(int id) { + this.localStats = HashBasedTable.create(); + } + + @Override + public Processor newProcessor(Processor p) { + LocalStatisticsProcessor oldProcessor = (LocalStatisticsProcessor) p; + LocalStatisticsProcessor newProcessor = new LocalStatisticsProcessor.Builder(oldProcessor).build(); + + newProcessor.setComputationResultStream(oldProcessor.computationResultStream); + + return newProcessor; + } + + /** + * Method to set the computation result when using this processor to build a topology. + * + * @param computeStream + */ + void setComputationResultStream(Stream computeStream) { + this.computationResultStream = computeStream; + } + + private AttributeClassObserver newNominalClassObserver() { + return (AttributeClassObserver) this.nominalClassObserver.copy(); + } + + private AttributeClassObserver newNumericClassObserver() { + return (AttributeClassObserver) this.numericClassObserver.copy(); + } + + /** + * Builder class to replace constructors with many parameters + * + * @author Arinto Murdopo + * + */ + static class Builder { + + private SplitCriterion splitCriterion = new InfoGainSplitCriterion(); + private boolean binarySplit = false; + private AttributeClassObserver nominalClassObserver = new NominalAttributeClassObserver(); + private AttributeClassObserver numericClassObserver = new GaussianNumericAttributeClassObserver(); + + Builder() { + + } + + Builder(LocalStatisticsProcessor oldProcessor) { + this.splitCriterion = oldProcessor.splitCriterion; + this.binarySplit = oldProcessor.binarySplit; + } + + Builder splitCriterion(SplitCriterion splitCriterion) { + this.splitCriterion = splitCriterion; + return this; + } + + Builder binarySplit(boolean binarySplit) { + this.binarySplit = binarySplit; + return this; + } + + Builder nominalClassObserver(AttributeClassObserver nominalClassObserver) { + this.nominalClassObserver = nominalClassObserver; + return this; + } + + Builder numericClassObserver(AttributeClassObserver numericClassObserver) { + this.numericClassObserver = numericClassObserver; + return this; + } + + LocalStatisticsProcessor build() { + return new LocalStatisticsProcessor(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java new file mode 100644 index 0000000..1e79f48 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java @@ -0,0 +1,746 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContentEvent; +import org.apache.samoa.learners.InstancesContentEvent; +import org.apache.samoa.learners.ResultContentEvent; +import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion; +import org.apache.samoa.moa.classifiers.core.driftdetection.ChangeDetector; +import org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion; +import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion; +import org.apache.samoa.topology.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samoa.moa.core.Utils.maxIndex; + +/** + * Model Aggegator Processor consists of the decision tree model. It connects to local-statistic PI via attribute stream + * and control stream. Model-aggregator PI sends the split instances via attribute stream and it sends control messages + * to ask local-statistic PI to perform computation via control stream. + * + * Model-aggregator PI sends the classification result via result stream to an evaluator PI for classifier or other + * destination PI. The calculation results from local statistic arrive to the model-aggregator PI via computation-result + * stream. + * + * @author Arinto Murdopo + * + */ +final class ModelAggregatorProcessor implements Processor { + + private static final long serialVersionUID = -1685875718300564886L; + private static final Logger logger = LoggerFactory.getLogger(ModelAggregatorProcessor.class); + + private int processorId; + + private Node treeRoot; + + private int activeLeafNodeCount; + private int inactiveLeafNodeCount; + private int decisionNodeCount; + private boolean growthAllowed; + + private final Instances dataset; + + // to support concurrent split + private long splitId; + private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes; + private BlockingQueue<Long> timedOutSplittingNodes; + + // available streams + private Stream resultStream; + private Stream attributeStream; + private Stream controlStream; + + private transient ScheduledExecutorService executor; + + private final SplitCriterion splitCriterion; + private final double splitConfidence; + private final double tieThreshold; + private final int gracePeriod; + private final int parallelismHint; + private final long timeOut; + + // private constructor based on Builder pattern + private ModelAggregatorProcessor(Builder builder) { + this.dataset = builder.dataset; + this.splitCriterion = builder.splitCriterion; + this.splitConfidence = builder.splitConfidence; + this.tieThreshold = builder.tieThreshold; + this.gracePeriod = builder.gracePeriod; + this.parallelismHint = builder.parallelismHint; + this.timeOut = builder.timeOut; + this.changeDetector = builder.changeDetector; + + InstancesHeader ih = new InstancesHeader(dataset); + this.setModelContext(ih); + } + + @Override + public boolean process(ContentEvent event) { + + // Poll the blocking queue shared between ModelAggregator and the time-out + // threads + Long timedOutSplitId = timedOutSplittingNodes.poll(); + if (timedOutSplitId != null) { // time out has been reached! + SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId); + if (splittingNode != null) { + this.splittingNodes.remove(timedOutSplitId); + this.continueAttemptToSplit(splittingNode.activeLearningNode, + splittingNode.foundNode); + + } + + } + + // Receive a new instance from source + if (event instanceof InstancesContentEvent) { + InstancesContentEvent instancesEvent = (InstancesContentEvent) event; + this.processInstanceContentEvent(instancesEvent); + // Send information to local-statistic PI + // for each of the nodes + if (this.foundNodeSet != null) { + for (FoundNode foundNode : this.foundNodeSet) { + ActiveLearningNode leafNode = (ActiveLearningNode) foundNode.getNode(); + AttributeBatchContentEvent[] abce = leafNode.getAttributeBatchContentEvent(); + if (abce != null) { + for (int i = 0; i < this.dataset.numAttributes() - 1; i++) { + this.sendToAttributeStream(abce[i]); + } + } + leafNode.setAttributeBatchContentEvent(null); + // this.sendToControlStream(event); //split information + // See if we can ask for splits + if (!leafNode.isSplitting()) { + double weightSeen = leafNode.getWeightSeen(); + // check whether it is the time for splitting + if (weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= this.gracePeriod) { + attemptToSplit(leafNode, foundNode); + } + } + } + } + this.foundNodeSet = null; + } else if (event instanceof LocalResultContentEvent) { + LocalResultContentEvent lrce = (LocalResultContentEvent) event; + Long lrceSplitId = lrce.getSplitId(); + SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId); + + if (splittingNodeInfo != null) { // if null, that means + // activeLearningNode has been + // removed by timeout thread + ActiveLearningNode activeLearningNode = splittingNodeInfo.activeLearningNode; + + activeLearningNode.addDistributedSuggestions( + lrce.getBestSuggestion(), + lrce.getSecondBestSuggestion()); + + if (activeLearningNode.isAllSuggestionsCollected()) { + splittingNodeInfo.scheduledFuture.cancel(false); + this.splittingNodes.remove(lrceSplitId); + this.continueAttemptToSplit(activeLearningNode, + splittingNodeInfo.foundNode); + } + } + } + return false; + } + + protected Set<FoundNode> foundNodeSet; + + @Override + public void onCreate(int id) { + this.processorId = id; + + this.activeLeafNodeCount = 0; + this.inactiveLeafNodeCount = 0; + this.decisionNodeCount = 0; + this.growthAllowed = true; + + this.splittingNodes = new ConcurrentHashMap<>(); + this.timedOutSplittingNodes = new LinkedBlockingQueue<>(); + this.splitId = 0; + + // Executor for scheduling time-out threads + this.executor = Executors.newScheduledThreadPool(8); + } + + @Override + public Processor newProcessor(Processor p) { + ModelAggregatorProcessor oldProcessor = (ModelAggregatorProcessor) p; + ModelAggregatorProcessor newProcessor = + new ModelAggregatorProcessor.Builder(oldProcessor).build(); + + newProcessor.setResultStream(oldProcessor.resultStream); + newProcessor.setAttributeStream(oldProcessor.attributeStream); + newProcessor.setControlStream(oldProcessor.controlStream); + return newProcessor; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(super.toString()); + + sb.append("ActiveLeafNodeCount: ").append(activeLeafNodeCount); + sb.append("InactiveLeafNodeCount: ").append(inactiveLeafNodeCount); + sb.append("DecisionNodeCount: ").append(decisionNodeCount); + sb.append("Growth allowed: ").append(growthAllowed); + return sb.toString(); + } + + void setResultStream(Stream resultStream) { + this.resultStream = resultStream; + } + + void setAttributeStream(Stream attributeStream) { + this.attributeStream = attributeStream; + } + + void setControlStream(Stream controlStream) { + this.controlStream = controlStream; + } + + void sendToAttributeStream(ContentEvent event) { + this.attributeStream.put(event); + } + + void sendToControlStream(ContentEvent event) { + this.controlStream.put(event); + } + + /** + * Helper method to generate new ResultContentEvent based on an instance and its prediction result. + * + * @param prediction + * The predicted class label from the decision tree model. + * @param inEvent + * The associated instance content event + * @return ResultContentEvent to be sent into Evaluator PI or other destination PI. + */ + private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), + inEvent.getClassId(), prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + private ResultContentEvent newResultContentEvent(double[] prediction, Instance inst, InstancesContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(), + prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + private List<InstancesContentEvent> contentEventList = new LinkedList<>(); + + /** + * Helper method to process the InstanceContentEvent + * + * @param instContentEvent + */ + private void processInstanceContentEvent(InstancesContentEvent instContentEvent) { + this.numBatches++; + this.contentEventList.add(instContentEvent); + if (this.numBatches == 1 || this.numBatches > 4) { + this.processInstances(this.contentEventList.remove(0)); + } + + if (instContentEvent.isLastEvent()) { + // drain remaining instances + while (!contentEventList.isEmpty()) { + processInstances(contentEventList.remove(0)); + } + } + + } + + private int numBatches = 0; + + private void processInstances(InstancesContentEvent instContentEvent) { + + Instance[] instances = instContentEvent.getInstances(); + boolean isTesting = instContentEvent.isTesting(); + boolean isTraining = instContentEvent.isTraining(); + for (Instance inst : instances) { + this.processInstance(inst, instContentEvent, isTesting, isTraining); + } + } + + private void processInstance(Instance inst, InstancesContentEvent instContentEvent, boolean isTesting, + boolean isTraining) { + inst.setDataset(this.dataset); + // Check the instance whether it is used for testing or training + // boolean testAndTrain = isTraining; //Train after testing + double[] prediction = null; + if (isTesting) { + prediction = getVotesForInstance(inst, false); + this.resultStream.put(newResultContentEvent(prediction, inst, + instContentEvent)); + } + + if (isTraining) { + trainOnInstanceImpl(inst); + if (this.changeDetector != null) { + if (prediction == null) { + prediction = getVotesForInstance(inst); + } + boolean correctlyClassifies = this.correctlyClassifies(inst, prediction); + double oldEstimation = this.changeDetector.getEstimation(); + this.changeDetector.input(correctlyClassifies ? 0 : 1); + if (this.changeDetector.getEstimation() > oldEstimation) { + // Start a new classifier + logger.info("Change detected, resetting the classifier"); + this.resetLearning(); + this.changeDetector.resetLearning(); + } + } + } + } + + private boolean correctlyClassifies(Instance inst, double[] prediction) { + return maxIndex(prediction) == (int) inst.classValue(); + } + + private void resetLearning() { + this.treeRoot = null; + // Remove nodes + FoundNode[] learningNodes = findNodes(); + for (FoundNode learningNode : learningNodes) { + Node node = learningNode.getNode(); + if (node instanceof SplitNode) { + SplitNode splitNode; + splitNode = (SplitNode) node; + for (int i = 0; i < splitNode.numChildren(); i++) { + splitNode.setChild(i, null); + } + } + } + } + + protected FoundNode[] findNodes() { + List<FoundNode> foundList = new LinkedList<>(); + findNodes(this.treeRoot, null, -1, foundList); + return foundList.toArray(new FoundNode[foundList.size()]); + } + + protected void findNodes(Node node, SplitNode parent, + int parentBranch, List<FoundNode> found) { + if (node != null) { + found.add(new FoundNode(node, parent, parentBranch)); + if (node instanceof SplitNode) { + SplitNode splitNode = (SplitNode) node; + for (int i = 0; i < splitNode.numChildren(); i++) { + findNodes(splitNode.getChild(i), splitNode, i, + found); + } + } + } + } + + /** + * Helper method to get the prediction result. The actual prediction result is delegated to the leaf node. + * + * @param inst + * @return + */ + private double[] getVotesForInstance(Instance inst) { + return getVotesForInstance(inst, false); + } + + private double[] getVotesForInstance(Instance inst, boolean isTraining) { + double[] ret; + FoundNode foundNode = null; + if (this.treeRoot != null) { + foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); + Node leafNode = foundNode.getNode(); + if (leafNode == null) { + leafNode = foundNode.getParent(); + } + + ret = leafNode.getClassVotes(inst, this); + } else { + int numClasses = this.dataset.numClasses(); + ret = new double[numClasses]; + + } + + // Training after testing to speed up the process + if (isTraining) { + if (this.treeRoot == null) { + this.treeRoot = newLearningNode(this.parallelismHint); + this.activeLeafNodeCount = 1; + foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); + } + trainOnInstanceImpl(foundNode, inst); + } + return ret; + } + + /** + * Helper method that represent training of an instance. Since it is decision tree, this method routes the incoming + * instance into the correct leaf and then update the statistic on the found leaf. + * + * @param inst + */ + private void trainOnInstanceImpl(Instance inst) { + if (this.treeRoot == null) { + this.treeRoot = newLearningNode(this.parallelismHint); + this.activeLeafNodeCount = 1; + + } + FoundNode foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); + trainOnInstanceImpl(foundNode, inst); + } + + private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) { + + Node leafNode = foundNode.getNode(); + + if (leafNode == null) { + leafNode = newLearningNode(this.parallelismHint); + foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode); + activeLeafNodeCount++; + } + + if (leafNode instanceof LearningNode) { + LearningNode learningNode = (LearningNode) leafNode; + learningNode.learnFromInstance(inst, this); + } + if (this.foundNodeSet == null) { + this.foundNodeSet = new HashSet<>(); + } + this.foundNodeSet.add(foundNode); + } + + /** + * Helper method to represent a split attempt + * + * @param activeLearningNode + * The corresponding active learning node which will be split + * @param foundNode + * The data structure to represents the filtering of the instance using the tree model. + */ + private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) { + // Increment the split ID + this.splitId++; + + // Schedule time-out thread + ScheduledFuture<?> timeOutHandler = this.executor.schedule(new AggregationTimeOutHandler(this.splitId, + this.timedOutSplittingNodes), + this.timeOut, TimeUnit.SECONDS); + + // Keep track of the splitting node information, so that we can continue the + // split + // once we receive all local statistic calculation from Local Statistic PI + // this.splittingNodes.put(Long.valueOf(this.splitId), new + // SplittingNodeInfo(activeLearningNode, foundNode, null)); + this.splittingNodes.put(this.splitId, new SplittingNodeInfo(activeLearningNode, foundNode, timeOutHandler)); + + // Inform Local Statistic PI to perform local statistic calculation + activeLearningNode.requestDistributedSuggestions(this.splitId, this); + } + + /** + * Helper method to continue the attempt to split once all local calculation results are received. + * + * @param activeLearningNode + * The corresponding active learning node which will be split + * @param foundNode + * The data structure to represents the filtering of the instance using the tree model. + */ + private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) { + AttributeSplitSuggestion bestSuggestion = activeLearningNode.getDistributedBestSuggestion(); + AttributeSplitSuggestion secondBestSuggestion = activeLearningNode.getDistributedSecondBestSuggestion(); + + // compare with null split + double[] preSplitDist = activeLearningNode.getObservedClassDistribution(); + AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null, + new double[0][], this.splitCriterion.getMeritOfSplit( + preSplitDist, + new double[][] { preSplitDist })); + + if ((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)) { + secondBestSuggestion = bestSuggestion; + bestSuggestion = nullSplit; + } else { + if ((secondBestSuggestion == null) || (nullSplit.compareTo(secondBestSuggestion) > 0)) { + secondBestSuggestion = nullSplit; + } + } + + boolean shouldSplit = false; + + if (secondBestSuggestion == null) { + shouldSplit = (bestSuggestion != null); + } else { + double hoeffdingBound = computeHoeffdingBound( + this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()), + this.splitConfidence, + activeLearningNode.getWeightSeen()); + + if ((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound) + || (hoeffdingBound < tieThreshold)) { + shouldSplit = true; + } + // TODO: add poor attributes removal + } + + SplitNode parent = foundNode.getParent(); + int parentBranch = foundNode.getParentBranch(); + + // split if the Hoeffding bound condition is satisfied + if (shouldSplit) { + + if (bestSuggestion.splitTest != null) { + SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, activeLearningNode.getObservedClassDistribution()); + + for (int i = 0; i < bestSuggestion.numSplits(); i++) { + Node newChild = newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), this.parallelismHint); + newSplit.setChild(i, newChild); + } + + this.activeLeafNodeCount--; + this.decisionNodeCount++; + this.activeLeafNodeCount += bestSuggestion.numSplits(); + + if (parent == null) { + this.treeRoot = newSplit; + } else { + parent.setChild(parentBranch, newSplit); + } + } + // TODO: add check on the model's memory size + } + + // housekeeping + activeLearningNode.endSplitting(); + activeLearningNode.setWeightSeenAtLastSplitEvaluation(activeLearningNode.getWeightSeen()); + } + + /** + * Helper method to deactivate learning node + * + * @param toDeactivate + * Active Learning Node that will be deactivated + * @param parent + * Parent of the soon-to-be-deactivated Active LearningNode + * @param parentBranch + * the branch index of the node in the parent node + */ + private void deactivateLearningNode(ActiveLearningNode toDeactivate, SplitNode parent, int parentBranch) { + Node newLeaf = new InactiveLearningNode(toDeactivate.getObservedClassDistribution()); + if (parent == null) { + this.treeRoot = newLeaf; + } else { + parent.setChild(parentBranch, newLeaf); + } + + this.activeLeafNodeCount--; + this.inactiveLeafNodeCount++; + } + + private LearningNode newLearningNode(int parallelismHint) { + return newLearningNode(new double[0], parallelismHint); + } + + private LearningNode newLearningNode(double[] initialClassObservations, int parallelismHint) { + // for VHT optimization, we need to dynamically instantiate the appropriate + // ActiveLearningNode + return new ActiveLearningNode(initialClassObservations, parallelismHint); + } + + /** + * Helper method to set the model context, i.e. how many attributes they are and what is the class index + * + * @param ih + */ + private void setModelContext(InstancesHeader ih) { + // TODO possibly refactored + if ((ih != null) && (ih.classIndex() < 0)) { + throw new IllegalArgumentException( + "Context for a classifier must include a class to learn"); + } + // TODO: check flag for checking whether training has started or not + + // model context is used to describe the model + logger.trace("Model context: {}", ih.toString()); + } + + private static double computeHoeffdingBound(double range, double confidence, double n) { + return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0 / confidence)) / (2.0 * n)); + } + + /** + * AggregationTimeOutHandler is a class to support time-out feature while waiting for local computation results from + * the local statistic PIs. + * + * @author Arinto Murdopo + * + */ + static class AggregationTimeOutHandler implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(AggregationTimeOutHandler.class); + private final Long splitId; + private final BlockingQueue<Long> toBeSplittedNodes; + + AggregationTimeOutHandler(Long splitId, BlockingQueue<Long> toBeSplittedNodes) { + this.splitId = splitId; + this.toBeSplittedNodes = toBeSplittedNodes; + } + + @Override + public void run() { + logger.debug("Time out is reached. AggregationTimeOutHandler is started."); + try { + toBeSplittedNodes.put(splitId); + } catch (InterruptedException e) { + logger.warn("Interrupted while trying to put the ID into the queue"); + } + logger.debug("AggregationTimeOutHandler is finished."); + } + } + + /** + * SplittingNodeInfo is a class to represents the ActiveLearningNode that is splitting + * + * @author Arinto Murdopo + * + */ + static class SplittingNodeInfo { + + private final ActiveLearningNode activeLearningNode; + private final FoundNode foundNode; + private final ScheduledFuture<?> scheduledFuture; + + SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode foundNode, ScheduledFuture<?> scheduledFuture) { + this.activeLearningNode = activeLearningNode; + this.foundNode = foundNode; + this.scheduledFuture = scheduledFuture; + } + } + + protected ChangeDetector changeDetector; + + public ChangeDetector getChangeDetector() { + return this.changeDetector; + } + + public void setChangeDetector(ChangeDetector cd) { + this.changeDetector = cd; + } + + /** + * Builder class to replace constructors with many parameters + * + * @author Arinto Murdopo + * + */ + static class Builder { + + // required parameters + private final Instances dataset; + + // default values + private SplitCriterion splitCriterion = new InfoGainSplitCriterion(); + private double splitConfidence = 0.0000001; + private double tieThreshold = 0.05; + private int gracePeriod = 200; + private int parallelismHint = 1; + private long timeOut = 30; + private ChangeDetector changeDetector = null; + + Builder(Instances dataset) { + this.dataset = dataset; + } + + Builder(ModelAggregatorProcessor oldProcessor) { + this.dataset = oldProcessor.dataset; + this.splitCriterion = oldProcessor.splitCriterion; + this.splitConfidence = oldProcessor.splitConfidence; + this.tieThreshold = oldProcessor.tieThreshold; + this.gracePeriod = oldProcessor.gracePeriod; + this.parallelismHint = oldProcessor.parallelismHint; + this.timeOut = oldProcessor.timeOut; + } + + Builder splitCriterion(SplitCriterion splitCriterion) { + this.splitCriterion = splitCriterion; + return this; + } + + Builder splitConfidence(double splitConfidence) { + this.splitConfidence = splitConfidence; + return this; + } + + Builder tieThreshold(double tieThreshold) { + this.tieThreshold = tieThreshold; + return this; + } + + Builder gracePeriod(int gracePeriod) { + this.gracePeriod = gracePeriod; + return this; + } + + Builder parallelismHint(int parallelismHint) { + this.parallelismHint = parallelismHint; + return this; + } + + Builder timeOut(long timeOut) { + this.timeOut = timeOut; + return this; + } + + Builder changeDetector(ChangeDetector changeDetector) { + this.changeDetector = changeDetector; + return this; + } + + ModelAggregatorProcessor build() { + return new ModelAggregatorProcessor(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java new file mode 100644 index 0000000..898a433 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java @@ -0,0 +1,103 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.DoubleVector; +import org.apache.samoa.instances.Instance; + +/** + * Abstract class that represents a node in the tree model. + * + * @author Arinto Murdopo + * + */ +abstract class Node implements java.io.Serializable { + + private static final long serialVersionUID = 4008521239214180548L; + + protected final DoubleVector observedClassDistribution; + + /** + * Method to route/filter an instance into its corresponding leaf. This method will be invoked recursively. + * + * @param inst + * Instance to be routed + * @param parent + * Parent of the current node + * @param parentBranch + * The index of the current node in the parent + * @return FoundNode which is the data structure to represent the resulting leaf. + */ + abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch); + + /** + * Method to return the predicted class of the instance based on the statistic inside the node. + * + * @param inst + * To-be-predicted instance + * @param map + * ModelAggregatorProcessor + * @return The prediction result in the form of class distribution + */ + abstract double[] getClassVotes(Instance inst, ModelAggregatorProcessor map); + + /** + * Method to check whether the node is a leaf node or not. + * + * @return Boolean flag to indicate whether the node is a leaf or not + */ + abstract boolean isLeaf(); + + /** + * Constructor of the tree node + * + * @param classObservation + * distribution of the observed classes. + */ + protected Node(double[] classObservation) { + this.observedClassDistribution = new DoubleVector(classObservation); + } + + /** + * Getter method for the class distribution + * + * @return Observed class distribution + */ + protected double[] getObservedClassDistribution() { + return this.observedClassDistribution.getArrayCopy(); + } + + /** + * A method to check whether the class distribution only consists of one class or not. + * + * @return Flag whether class distribution is pure or not. + */ + protected boolean observedClassDistributionIsPure() { + return (observedClassDistribution.numNonZeroEntries() < 2); + } + + protected void describeSubtree(ModelAggregatorProcessor modelAggrProc, StringBuilder out, int indent) { + // TODO: implement method to gracefully define the tree + } + + // TODO: calculate promise for limiting the model based on the memory size + // double calculatePromise(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java new file mode 100644 index 0000000..c2b1a47 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java @@ -0,0 +1,117 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.instances.Instance; +import org.apache.samoa.moa.classifiers.core.conditionaltests.InstanceConditionalTest; +import org.apache.samoa.moa.core.AutoExpandVector; + +/** + * SplitNode represents the node that contains one or more questions in the decision tree model, in order to route the + * instances into the correct leaf. + * + * @author Arinto Murdopo + * + */ +public class SplitNode extends Node { + + private static final long serialVersionUID = -7380795529928485792L; + + private final AutoExpandVector<Node> children; + protected final InstanceConditionalTest splitTest; + + public SplitNode(InstanceConditionalTest splitTest, + double[] classObservation) { + super(classObservation); + this.children = new AutoExpandVector<>(); + this.splitTest = splitTest; + } + + @Override + FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch) { + int childIndex = instanceChildIndex(inst); + if (childIndex >= 0) { + Node child = getChild(childIndex); + if (child != null) { + return child.filterInstanceToLeaf(inst, this, childIndex); + } + return new FoundNode(null, this, childIndex); + } + return new FoundNode(this, parent, parentBranch); + } + + @Override + boolean isLeaf() { + return false; + } + + @Override + double[] getClassVotes(Instance inst, ModelAggregatorProcessor vht) { + return this.observedClassDistribution.getArrayCopy(); + } + + /** + * Method to return the number of children of this split node + * + * @return number of children + */ + int numChildren() { + return this.children.size(); + } + + /** + * Method to set the children in a specific index of the SplitNode with the appropriate child + * + * @param index + * Index of the child in the SplitNode + * @param child + * The child node + */ + void setChild(int index, Node child) { + if ((this.splitTest.maxBranches() >= 0) + && (index >= this.splitTest.maxBranches())) { + throw new IndexOutOfBoundsException(); + } + this.children.set(index, child); + } + + /** + * Method to get the child node given the index + * + * @param index + * The child node index + * @return The child node in the given index + */ + Node getChild(int index) { + return this.children.get(index); + } + + /** + * Method to route the instance using this split node + * + * @param inst + * The routed instance + * @return The index of the branch where the instance is routed + */ + int instanceChildIndex(Instance inst) { + return this.splitTest.branchForInstance(inst); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java new file mode 100644 index 0000000..ea7e53d --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java @@ -0,0 +1,185 @@ +package org.apache.samoa.learners.classifiers.trees; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.google.common.collect.ImmutableSet; + +import java.util.Set; + +import org.apache.samoa.core.Processor; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.learners.AdaptiveLearner; +import org.apache.samoa.learners.ClassificationLearner; +import org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver; +import org.apache.samoa.moa.classifiers.core.attributeclassobservers.DiscreteAttributeClassObserver; +import org.apache.samoa.moa.classifiers.core.attributeclassobservers.NumericAttributeClassObserver; +import org.apache.samoa.moa.classifiers.core.driftdetection.ChangeDetector; +import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.TopologyBuilder; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.FlagOption; +import com.github.javacliparser.FloatOption; +import com.github.javacliparser.IntOption; + +/** + * Vertical Hoeffding Tree. + * <p/> + * Vertical Hoeffding Tree (VHT) classifier is a distributed classifier that utilizes vertical parallelism on top of + * Very Fast Decision Tree (VFDT) classifier. + * + * @author Arinto Murdopo + */ +public final class VerticalHoeffdingTree implements ClassificationLearner, AdaptiveLearner, Configurable { + + private static final long serialVersionUID = -4937416312929984057L; + + public ClassOption numericEstimatorOption = new ClassOption("numericEstimator", + 'n', "Numeric estimator to use.", NumericAttributeClassObserver.class, + "GaussianNumericAttributeClassObserver"); + + public ClassOption nominalEstimatorOption = new ClassOption("nominalEstimator", + 'd', "Nominal estimator to use.", DiscreteAttributeClassObserver.class, + "NominalAttributeClassObserver"); + + public ClassOption splitCriterionOption = new ClassOption("splitCriterion", + 's', "Split criterion to use.", SplitCriterion.class, + "InfoGainSplitCriterion"); + + public FloatOption splitConfidenceOption = new FloatOption( + "splitConfidence", + 'c', + "The allowable error in split decision, values closer to 0 will take longer to decide.", + 0.0000001, 0.0, 1.0); + + public FloatOption tieThresholdOption = new FloatOption("tieThreshold", + 't', "Threshold below which a split will be forced to break ties.", + 0.05, 0.0, 1.0); + + public IntOption gracePeriodOption = new IntOption( + "gracePeriod", + 'g', + "The number of instances a leaf should observe between split attempts.", + 200, 0, Integer.MAX_VALUE); + + public IntOption parallelismHintOption = new IntOption( + "parallelismHint", + 'p', + "The number of local statistics PI to do distributed computation", + 1, 1, Integer.MAX_VALUE); + + public IntOption timeOutOption = new IntOption( + "timeOut", + 'o', + "The duration to wait all distributed computation results from local statistics PI", + 30, 1, Integer.MAX_VALUE); + + public FlagOption binarySplitsOption = new FlagOption("binarySplits", 'b', + "Only allow binary splits."); + + private Stream resultStream; + + private FilterProcessor filterProc; + + @Override + public void init(TopologyBuilder topologyBuilder, Instances dataset, int parallelism) { + + this.filterProc = new FilterProcessor.Builder(dataset) + .build(); + topologyBuilder.addProcessor(filterProc, parallelism); + + Stream filterStream = topologyBuilder.createStream(filterProc); + this.filterProc.setOutputStream(filterStream); + + ModelAggregatorProcessor modelAggrProc = new ModelAggregatorProcessor.Builder(dataset) + .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue()) + .splitConfidence(splitConfidenceOption.getValue()) + .tieThreshold(tieThresholdOption.getValue()) + .gracePeriod(gracePeriodOption.getValue()) + .parallelismHint(parallelismHintOption.getValue()) + .timeOut(timeOutOption.getValue()) + .changeDetector(this.getChangeDetector()) + .build(); + + topologyBuilder.addProcessor(modelAggrProc, parallelism); + + topologyBuilder.connectInputShuffleStream(filterStream, modelAggrProc); + + this.resultStream = topologyBuilder.createStream(modelAggrProc); + modelAggrProc.setResultStream(resultStream); + + Stream attributeStream = topologyBuilder.createStream(modelAggrProc); + modelAggrProc.setAttributeStream(attributeStream); + + Stream controlStream = topologyBuilder.createStream(modelAggrProc); + modelAggrProc.setControlStream(controlStream); + + LocalStatisticsProcessor locStatProc = new LocalStatisticsProcessor.Builder() + .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue()) + .binarySplit(binarySplitsOption.isSet()) + .nominalClassObserver((AttributeClassObserver) this.nominalEstimatorOption.getValue()) + .numericClassObserver((AttributeClassObserver) this.numericEstimatorOption.getValue()) + .build(); + + topologyBuilder.addProcessor(locStatProc, parallelismHintOption.getValue()); + topologyBuilder.connectInputKeyStream(attributeStream, locStatProc); + topologyBuilder.connectInputAllStream(controlStream, locStatProc); + + Stream computeStream = topologyBuilder.createStream(locStatProc); + + locStatProc.setComputationResultStream(computeStream); + topologyBuilder.connectInputAllStream(computeStream, modelAggrProc); + } + + @Override + public Processor getInputProcessor() { + return this.filterProc; + } + + @Override + public Set<Stream> getResultStreams() { + return ImmutableSet.of(this.resultStream); + } + + protected ChangeDetector changeDetector; + + @Override + public ChangeDetector getChangeDetector() { + return this.changeDetector; + } + + @Override + public void setChangeDetector(ChangeDetector cd) { + this.changeDetector = cd; + } + + static class LearningNodeIdGenerator { + + // TODO: add code to warn user of when value reaches Long.MAX_VALUES + private static long id = 0; + + static synchronized long generate() { + return id++; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java new file mode 100644 index 0000000..e7eb5b5 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java @@ -0,0 +1,90 @@ +package org.apache.samoa.learners.clusterers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.instances.Instance; + +import net.jcip.annotations.Immutable; + +/** + * The Class ClusteringContentEvent. + */ +@Immutable +final public class ClusteringContentEvent implements ContentEvent { + + private static final long serialVersionUID = -7746983521296618922L; + private Instance instance; + private boolean isLast = false; + private String key; + private boolean isSample; + + public ClusteringContentEvent() { + // Necessary for kryo serializer + } + + /** + * Instantiates a new clustering event. + * + * @param index + * the index + * @param instance + * the instance + */ + public ClusteringContentEvent(long index, Instance instance) { + /* + * if (instance != null) { this.instance = new + * SerializableInstance(instance); } + */ + this.instance = instance; + this.setKey(Long.toString(index)); + } + + @Override + public String getKey() { + return this.key; + } + + @Override + public void setKey(String str) { + this.key = str; + } + + @Override + public boolean isLastEvent() { + return this.isLast; + } + + public void setLast(boolean isLast) { + this.isLast = isLast; + } + + public Instance getInstance() { + return this.instance; + } + + public boolean isSample() { + return isSample; + } + + public void setSample(boolean b) { + this.isSample = b; + } +}
