http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java new file mode 100644 index 0000000..5e86cb0 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldContentEvent.java @@ -0,0 +1,69 @@ +package com.yahoo.labs.samoa.examples; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; + +/** + * Example {@link ContentEvent} that contains a single integer. + */ +public class HelloWorldContentEvent implements ContentEvent { + + private static final long serialVersionUID = -2406968925730298156L; + private final boolean isLastEvent; + private final int helloWorldData; + + public HelloWorldContentEvent(int helloWorldData, boolean isLastEvent) { + this.isLastEvent = isLastEvent; + this.helloWorldData = helloWorldData; + } + + /* + * No-argument constructor for Kryo + */ + public HelloWorldContentEvent() { + this(0,false); + } + + @Override + public String getKey() { + return null; + } + + @Override + public void setKey(String str) { + // do nothing, it's key-less content event + } + + @Override + public boolean isLastEvent() { + return isLastEvent; + } + + public int getHelloWorldData() { + return helloWorldData; + } + + @Override + public String toString() { + return "HelloWorldContentEvent [helloWorldData=" + helloWorldData + "]"; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java new file mode 100644 index 0000000..e22c0fe --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldDestinationProcessor.java @@ -0,0 +1,49 @@ +package com.yahoo.labs.samoa.examples; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; + +/** + * Example {@link Processor} that simply prints the received events to standard output. + */ +public class HelloWorldDestinationProcessor implements Processor { + + private static final long serialVersionUID = -6042613438148776446L; + private int processorId; + + @Override + public boolean process(ContentEvent event) { + System.out.println(processorId + ": " + event); + return true; + } + + @Override + public void onCreate(int id) { + this.processorId = id; + } + + @Override + public Processor newProcessor(Processor p) { + return new HelloWorldDestinationProcessor(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java new file mode 100644 index 0000000..a37201f --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldSourceProcessor.java @@ -0,0 +1,75 @@ +package com.yahoo.labs.samoa.examples; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Random; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; + +/** + * Example {@link EntranceProcessor} that generates a stream of random integers. + */ +public class HelloWorldSourceProcessor implements EntranceProcessor { + + private static final long serialVersionUID = 6212296305865604747L; + private Random rnd; + private final long maxInst; + private long count; + + public HelloWorldSourceProcessor(long maxInst) { + this.maxInst = maxInst; + } + + @Override + public boolean process(ContentEvent event) { + // do nothing, API will be refined further + return false; + } + + @Override + public void onCreate(int id) { + rnd = new Random(id); + } + + @Override + public Processor newProcessor(Processor p) { + HelloWorldSourceProcessor hwsp = (HelloWorldSourceProcessor) p; + return new HelloWorldSourceProcessor(hwsp.maxInst); + } + + @Override + public boolean isFinished() { + return count >= maxInst; + } + + @Override + public boolean hasNext() { + return count < maxInst; + } + + @Override + public ContentEvent nextEvent() { + count++; + return new HelloWorldContentEvent(rnd.nextInt(), false); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java new file mode 100644 index 0000000..e6658f1 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/examples/HelloWorldTask.java @@ -0,0 +1,98 @@ +package com.yahoo.labs.samoa.examples; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; +import com.yahoo.labs.samoa.tasks.Task; +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; +import com.yahoo.labs.samoa.topology.TopologyBuilder; + +/** + * Example {@link Task} in SAMOA. This task simply sends events from a source {@link HelloWorldSourceProcessor} to a destination + * {@link HelloWorldDestinationProcessor}. The events are random integers generated by the source and encapsulated in a {@link HelloWorldContentEvent}. The + * destination prints the content of the event to standard output, prepended by the processor id. + * + * The task has 2 main options: the number of events the source will generate (-i) and the parallelism level of the destination (-p). + */ +public class HelloWorldTask implements Task, Configurable { + + private static final long serialVersionUID = -5134935141154021352L; + private static Logger logger = LoggerFactory.getLogger(HelloWorldTask.class); + + /** The topology builder for the task. */ + private TopologyBuilder builder; + /** The topology that will be created for the task */ + private Topology helloWorldTopology; + + public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', + "Maximum number of instances to generate (-1 = no limit).", 1000000, -1, Integer.MAX_VALUE); + + public IntOption helloWorldParallelismOption = new IntOption("parallelismOption", 'p', + "Number of destination Processors", 1, 1, Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', + "Identifier of the evaluation", "HelloWorldTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + @Override + public void init() { + // create source EntranceProcessor + /* The event source for the topology. Implements EntranceProcessor */ + HelloWorldSourceProcessor sourceProcessor = new HelloWorldSourceProcessor(instanceLimitOption.getValue()); + builder.addEntranceProcessor(sourceProcessor); + + // create Stream + Stream stream = builder.createStream(sourceProcessor); + + // create destination Processor + /* The event sink for the topology. Implements Processor */ + HelloWorldDestinationProcessor destProcessor = new HelloWorldDestinationProcessor(); + builder.addProcessor(destProcessor, helloWorldParallelismOption.getValue()); + builder.connectInputShuffleStream(stream, destProcessor); + + // build the topology + helloWorldTopology = builder.build(); + logger.debug("Successfully built the topology"); + } + + @Override + public Topology getTopology() { + return helloWorldTopology; + } + + @Override + public void setFactory(ComponentFactory factory) { + // will be removed when dynamic binding is implemented + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiating TopologyBuilder"); + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java new file mode 100644 index 0000000..0986253 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/AdaptiveLearner.java @@ -0,0 +1,53 @@ +package com.yahoo.labs.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + + +import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector; +import com.yahoo.labs.samoa.topology.Stream; + +/** + * The Interface Adaptive Learner. + * Initializing Classifier should initalize PI to connect the Classifier with the input stream + * and initialize result stream so that other PI can connect to the classification result of this classifier + */ + +public interface AdaptiveLearner { + + /** + * Gets the change detector item. + * + * @return the change detector item + */ + public ChangeDetector getChangeDetector(); + + /** + * Sets the change detector item. + * + * @param cd the change detector item + */ + public void setChangeDetector(ChangeDetector cd); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java new file mode 100644 index 0000000..7f1a6c9 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ClassificationLearner.java @@ -0,0 +1,27 @@ +package com.yahoo.labs.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.learners.Learner; + +public interface ClassificationLearner extends Learner { + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java new file mode 100644 index 0000000..91b1b7b --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java @@ -0,0 +1,207 @@ + +package com.yahoo.labs.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.SerializableInstance; +import net.jcip.annotations.Immutable; +import com.yahoo.labs.samoa.instances.Instance; +//import weka.core.Instance; + + +/** + * The Class InstanceEvent. + */ +@Immutable +final public class InstanceContentEvent implements ContentEvent { + + /** + * + */ + private static final long serialVersionUID = -8620668863064613845L; + private long instanceIndex; + private int classifierIndex; + private int evaluationIndex; + private SerializableInstance instance; + private boolean isTraining; + private boolean isTesting; + private boolean isLast = false; + + public InstanceContentEvent() { + + } + + /** + * Instantiates a new instance event. + * + * @param index the index + * @param instance the instance + * @param isTraining the is training + */ + public InstanceContentEvent(long index, Instance instance, + boolean isTraining, boolean isTesting) { + if (instance != null) { + this.instance = new SerializableInstance(instance); + } + this.instanceIndex = index; + this.isTraining = isTraining; + this.isTesting = isTesting; + } + + /** + * Gets the single instance of InstanceEvent. + * + * @return the instance. + */ + public Instance getInstance() { + return instance; + } + + /** + * Gets the instance index. + * + * @return the index of the data vector. + */ + public long getInstanceIndex() { + return instanceIndex; + } + + /** + * Gets the class id. + * + * @return the true class of the vector. + */ + public int getClassId() { + // return classId; + return (int) instance.classValue(); + } + + /** + * Checks if is training. + * + * @return true if this is training data. + */ + public boolean isTraining() { + return isTraining; + } + + /** + * Set training flag. + * + * @param training flag. + */ + public void setTraining(boolean training) { + this.isTraining = training; + } + + /** + * Checks if is testing. + * + * @return true if this is testing data. + */ + public boolean isTesting(){ + return isTesting; + } + + /** + * Set testing flag. + * + * @param testing flag. + */ + public void setTesting(boolean testing) { + this.isTesting = testing; + } + + /** + * Gets the classifier index. + * + * @return the classifier index + */ + public int getClassifierIndex() { + return classifierIndex; + } + + /** + * Sets the classifier index. + * + * @param classifierIndex the new classifier index + */ + public void setClassifierIndex(int classifierIndex) { + this.classifierIndex = classifierIndex; + } + + /** + * Gets the evaluation index. + * + * @return the evaluation index + */ + public int getEvaluationIndex() { + return evaluationIndex; + } + + /** + * Sets the evaluation index. + * + * @param evaluationIndex the new evaluation index + */ + public void setEvaluationIndex(int evaluationIndex) { + this.evaluationIndex = evaluationIndex; + } + + /* (non-Javadoc) + * @see samoa.core.ContentEvent#getKey(int) + */ + public String getKey(int key) { + if (key == 0) + return Long.toString(this.getEvaluationIndex()); + else return Long.toString(10000 + * this.getEvaluationIndex() + + this.getClassifierIndex()); + } + + @Override + public String getKey() { + //System.out.println("InstanceContentEvent "+Long.toString(this.instanceIndex)); + return Long.toString(this.getClassifierIndex()); + } + + @Override + public void setKey(String str) { + this.instanceIndex = Long.parseLong(str); + } + + @Override + public boolean isLastEvent() { + return isLast; + } + + public void setLast(boolean isLast) { + this.isLast = isLast; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java new file mode 100644 index 0000000..ff005b6 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstancesContentEvent.java @@ -0,0 +1,193 @@ + +package com.yahoo.labs.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.SerializableInstance; +import net.jcip.annotations.Immutable; +import com.yahoo.labs.samoa.instances.Instance; +import java.util.LinkedList; +import java.util.List; +//import weka.core.Instance; + + +/** + * The Class InstanceEvent. + */ +@Immutable +final public class InstancesContentEvent implements ContentEvent { + + /** + * + */ + private static final long serialVersionUID = -8620668863064613845L; + private long instanceIndex; + private int classifierIndex; + private int evaluationIndex; + //private SerializableInstance instance; + private boolean isTraining; + private boolean isTesting; + private boolean isLast = false; + + public InstancesContentEvent() { + + } + + /** + * Instantiates a new instance event. + * + * @param index the index + * @param instance the instance + * @param isTraining the is training + */ + public InstancesContentEvent(long index,// Instance instance, + boolean isTraining, boolean isTesting) { + /*if (instance != null) { + this.instance = new SerializableInstance(instance); + }*/ + this.instanceIndex = index; + this.isTraining = isTraining; + this.isTesting = isTesting; + } + + public InstancesContentEvent(InstanceContentEvent event){ + this.instanceIndex = event.getInstanceIndex(); + this.isTraining = event.isTraining(); + this.isTesting = event.isTesting(); + } + + protected List<Instance> instanceList = new LinkedList<Instance>(); + + public void add(Instance instance){ + instanceList.add(new SerializableInstance(instance)); + } + + /** + * Gets the single instance of InstanceEvent. + * + * @return the instance. + */ + public Instance[] getInstances() { + return instanceList.toArray(new Instance[instanceList.size()]); + } + + /** + * Gets the instance index. + * + * @return the index of the data vector. + */ + public long getInstanceIndex() { + return instanceIndex; + } + + /** + * Checks if is training. + * + * @return true if this is training data. + */ + public boolean isTraining() { + return isTraining; + } + + /** + * Checks if is testing. + * + * @return true if this is testing data. + */ + public boolean isTesting(){ + return isTesting; + } + + /** + * Gets the classifier index. + * + * @return the classifier index + */ + public int getClassifierIndex() { + return classifierIndex; + } + + /** + * Sets the classifier index. + * + * @param classifierIndex the new classifier index + */ + public void setClassifierIndex(int classifierIndex) { + this.classifierIndex = classifierIndex; + } + + /** + * Gets the evaluation index. + * + * @return the evaluation index + */ + public int getEvaluationIndex() { + return evaluationIndex; + } + + /** + * Sets the evaluation index. + * + * @param evaluationIndex the new evaluation index + */ + public void setEvaluationIndex(int evaluationIndex) { + this.evaluationIndex = evaluationIndex; + } + + /* (non-Javadoc) + * @see samoa.core.ContentEvent#getKey(int) + */ + public String getKey(int key) { + if (key == 0) + return Long.toString(this.getEvaluationIndex()); + else return Long.toString(10000 + * this.getEvaluationIndex() + + this.getClassifierIndex()); + } + + @Override + public String getKey() { + //System.out.println("InstanceContentEvent "+Long.toString(this.instanceIndex)); + return Long.toString(this.getClassifierIndex()); + } + + @Override + public void setKey(String str) { + this.instanceIndex = Long.parseLong(str); + } + + @Override + public boolean isLastEvent() { + return isLast; + } + + public void setLast(boolean isLast) { + this.isLast = isLast; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java new file mode 100644 index 0000000..993ca47 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/Learner.java @@ -0,0 +1,62 @@ +package com.yahoo.labs.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.TopologyBuilder; + +import java.io.Serializable; +import java.util.Set; + +/** + * The Interface Classifier. + * Initializing Classifier should initalize PI to connect the Classifier with the input stream + * and initialize result stream so that other PI can connect to the classification result of this classifier + */ + +public interface Learner extends Serializable{ + + /** + * Inits the Learner object. + * + * @param topologyBuilder the topology builder + * @param dataset the dataset + * @param parallelism the parallelism + */ + public void init(TopologyBuilder topologyBuilder, Instances dataset, int parallelism); + + /** + * Gets the input processing item. + * + * @return the input processing item + */ + public Processor getInputProcessor(); + + + /** + * Gets the result streams + * + * @return the set of result streams + */ + public Set<Stream> getResultStreams(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java new file mode 100644 index 0000000..63f233e --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/RegressionLearner.java @@ -0,0 +1,27 @@ +package com.yahoo.labs.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.learners.Learner; + +public interface RegressionLearner extends Learner { + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java new file mode 100644 index 0000000..0879872 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/ResultContentEvent.java @@ -0,0 +1,212 @@ +package com.yahoo.labs.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.SerializableInstance; +import com.yahoo.labs.samoa.instances.Instance; + +/** + * License + */ + + +/** + * The Class ResultEvent. + */ +final public class ResultContentEvent implements ContentEvent { + + /** + * + */ + private static final long serialVersionUID = -2650420235386873306L; + private long instanceIndex; + private int classifierIndex; + private int evaluationIndex; + private SerializableInstance instance; + + private int classId; + private double[] classVotes; + + private final boolean isLast; + + public ResultContentEvent(){ + this.isLast = false; + } + + + public ResultContentEvent(boolean isLast) { + this.isLast = isLast; + } + + /** + * Instantiates a new result event. + * + * @param instanceIndex + * the instance index + * @param instance + * the instance + * @param classId + * the class id + * @param classVotes + * the class votes + */ + public ResultContentEvent(long instanceIndex, Instance instance, int classId, + double[] classVotes, boolean isLast) { + if(instance != null){ + this.instance = new SerializableInstance(instance); + } + this.instanceIndex = instanceIndex; + this.classId = classId; + this.classVotes = classVotes; + this.isLast = isLast; + } + + /** + * Gets the single instance of ResultEvent. + * + * @return single instance of ResultEvent + */ + public SerializableInstance getInstance() { + return instance; + } + + /** + * Sets the instance. + * + * @param instance + * the new instance + */ + public void setInstance(SerializableInstance instance) { + this.instance = instance; + } + + /** + * Gets the num classes. + * + * @return the num classes + */ + public int getNumClasses() { // To remove + return instance.numClasses(); + } + + /** + * Gets the instance index. + * + * @return the index of the data vector. + */ + public long getInstanceIndex() { + return instanceIndex; + } + + /** + * Gets the class id. + * + * @return the true class of the vector. + */ + public int getClassId() { // To remove + return classId;// (int) instance.classValue();//classId; + } + + /** + * Gets the class votes. + * + * @return the class votes + */ + public double[] getClassVotes() { + return classVotes; + } + + /** + * Sets the class votes. + * + * @param classVotes + * the new class votes + */ + public void setClassVotes(double[] classVotes) { + this.classVotes = classVotes; + } + + /** + * Gets the classifier index. + * + * @return the classifier index + */ + public int getClassifierIndex() { + return classifierIndex; + } + + /** + * Sets the classifier index. + * + * @param classifierIndex + * the new classifier index + */ + public void setClassifierIndex(int classifierIndex) { + this.classifierIndex = classifierIndex; + } + + /** + * Gets the evaluation index. + * + * @return the evaluation index + */ + public int getEvaluationIndex() { + return evaluationIndex; + } + + /** + * Sets the evaluation index. + * + * @param evaluationIndex + * the new evaluation index + */ + public void setEvaluationIndex(int evaluationIndex) { + this.evaluationIndex = evaluationIndex; + } + + /* (non-Javadoc) + * @see samoa.core.ContentEvent#getKey(int) + */ + //@Override + public String getKey(int key) { + if (key == 0) + return Long.toString(this.getEvaluationIndex()); + else return Long.toString(this.getEvaluationIndex() + + 1000 * this.getInstanceIndex()); + } + + @Override + public String getKey() { + return Long.toString(this.getEvaluationIndex()%100); + } + + @Override + public void setKey(String str) { + this.evaluationIndex = Integer.parseInt(str); + } + + @Override + public boolean isLastEvent() { + return isLast; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java new file mode 100644 index 0000000..b5c30db --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearner.java @@ -0,0 +1,78 @@ +package com.yahoo.labs.samoa.learners.classifiers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.Map; + +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; + +/** + * Learner interface for non-distributed learners. + * + * @author abifet + */ +public interface LocalLearner extends Serializable { + + /** + * Creates a new learner object. + * + * @return the learner + */ + LocalLearner create(); + + /** + * Predicts the class memberships for a given instance. If an instance is + * unclassified, the returned array elements must be all zero. + * + * @param inst + * the instance to be classified + * @return an array containing the estimated membership probabilities of the + * test instance in each class + */ + double[] getVotesForInstance(Instance inst); + + /** + * Resets this classifier. It must be similar to starting a new classifier + * from scratch. + * + */ + void resetLearning(); + + /** + * Trains this classifier incrementally using the given instance. + * + * @param inst + * the instance to be used for training + */ + void trainOnInstance(Instance inst); + + /** + * Sets where to obtain the information of attributes of Instances + * + * @param dataset + * the dataset that contains the information + */ + @Deprecated + public void setDataset(Instances dataset); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java new file mode 100755 index 0000000..ae897f0 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/LocalLearnerProcessor.java @@ -0,0 +1,217 @@ +package com.yahoo.labs.samoa.learners.classifiers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.learners.InstanceContentEvent; +import com.yahoo.labs.samoa.learners.ResultContentEvent; +import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector; +import com.yahoo.labs.samoa.topology.Stream; + +import static com.yahoo.labs.samoa.moa.core.Utils.maxIndex; + +/** + * The Class LearnerProcessor. + */ +final public class LocalLearnerProcessor implements Processor { + + /** + * + */ + private static final long serialVersionUID = -1577910988699148691L; + + private static final Logger logger = LoggerFactory.getLogger(LocalLearnerProcessor.class); + + private LocalLearner model; + private Stream outputStream; + private int modelId; + private long instancesCount = 0; + + /** + * Sets the learner. + * + * @param model the model to set + */ + public void setLearner(LocalLearner model) { + this.model = model; + } + + /** + * Gets the learner. + * + * @return the model + */ + public LocalLearner getLearner() { + return model; + } + + /** + * Set the output streams. + * + * @param outputStream the new output stream + */ + public void setOutputStream(Stream outputStream) { + this.outputStream = outputStream; + } + + /** + * Gets the output stream. + * + * @return the output stream + */ + public Stream getOutputStream() { + return outputStream; + } + + /** + * Gets the instances count. + * + * @return number of observation vectors used in training iteration. + */ + public long getInstancesCount() { + return instancesCount; + } + + /** + * Update stats. + * + * @param event the event + */ + private void updateStats(InstanceContentEvent event) { + Instance inst = event.getInstance(); + this.model.trainOnInstance(inst); + this.instancesCount++; + if (this.changeDetector != null) { + boolean correctlyClassifies = this.correctlyClassifies(inst); + double oldEstimation = this.changeDetector.getEstimation(); + this.changeDetector.input(correctlyClassifies ? 0 : 1); + if (this.changeDetector.getChange() && this.changeDetector.getEstimation() > oldEstimation) { + //Start a new classifier + this.model.resetLearning(); + this.changeDetector.resetLearning(); + } + } + } + + /** + * Gets whether this classifier correctly classifies an instance. Uses + * getVotesForInstance to obtain the prediction and the instance to obtain + * its true class. + * + * + * @param inst the instance to be classified + * @return true if the instance is correctly classified + */ + private boolean correctlyClassifies(Instance inst) { + return maxIndex(model.getVotesForInstance(inst)) == (int) inst.classValue(); + } + + /** The test. */ + protected int test; //to delete + + /** + * On event. + * + * @param event the event + * @return true, if successful + */ + @Override + public boolean process(ContentEvent event) { + + InstanceContentEvent inEvent = (InstanceContentEvent) event; + Instance instance = inEvent.getInstance(); + + if (inEvent.getInstanceIndex() < 0) { + //end learning + ResultContentEvent outContentEvent = new ResultContentEvent(-1, instance, 0, + new double[0], inEvent.isLastEvent()); + outContentEvent.setClassifierIndex(this.modelId); + outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + outputStream.put(outContentEvent); + return false; + } + + if (inEvent.isTesting()){ + double[] dist = model.getVotesForInstance(instance); + ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(), + instance, inEvent.getClassId(), dist, inEvent.isLastEvent()); + outContentEvent.setClassifierIndex(this.modelId); + outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + logger.trace(inEvent.getInstanceIndex() + " {} {}", modelId, dist); + outputStream.put(outContentEvent); + } + + if (inEvent.isTraining()) { + updateStats(inEvent); + } + return false; + } + + /* (non-Javadoc) + * @see samoa.core.Processor#onCreate(int) + */ + @Override + public void onCreate(int id) { + this.modelId = id; + model = model.create(); + } + + /* (non-Javadoc) + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { + LocalLearnerProcessor newProcessor = new LocalLearnerProcessor(); + LocalLearnerProcessor originProcessor = (LocalLearnerProcessor) sourceProcessor; + + if (originProcessor.getLearner() != null){ + newProcessor.setLearner(originProcessor.getLearner().create()); + } + + if (originProcessor.getChangeDetector() != null){ + newProcessor.setChangeDetector(originProcessor.getChangeDetector()); + } + + newProcessor.setOutputStream(originProcessor.getOutputStream()); + return newProcessor; + } + + protected ChangeDetector changeDetector; + + public ChangeDetector getChangeDetector() { + return this.changeDetector; + } + + public void setChangeDetector(ChangeDetector cd) { + this.changeDetector = cd; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java new file mode 100644 index 0000000..7e9cb4a --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java @@ -0,0 +1,269 @@ +package com.yahoo.labs.samoa.learners.classifiers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.GaussianNumericAttributeClassObserver; +import com.yahoo.labs.samoa.moa.core.GaussianEstimator; + +/** + * Implementation of a non-distributed Naive Bayes classifier. + * + * At the moment, the implementation models all attributes as numeric + * attributes. + * + * @author Olivier Van Laere (vanlaere yahoo-inc dot com) + */ +public class NaiveBayes implements LocalLearner { + + /** + * Default smoothing factor. For now fixed to 1E-20. + */ + private static final double ADDITIVE_SMOOTHING_FACTOR = 1e-20; + + /** + * serialVersionUID for serialization + */ + private static final long serialVersionUID = 1325775209672996822L; + + /** + * Instance of a logger for use in this class. + */ + private static final Logger logger = LoggerFactory.getLogger(NaiveBayes.class); + + /** + * The actual model. + */ + protected Map<Integer, GaussianNumericAttributeClassObserver> attributeObservers; + + /** + * Class statistics + */ + protected Map<Integer, Double> classInstances; + + /** + * Class zero-prototypes. + */ + protected Map<Integer, Double> classPrototypes; + + /** + * Retrieve the number of classes currently known to this local model + * + * @return the number of classes currently known to this local model + */ + protected int getNumberOfClasses() { + return this.classInstances.size(); + } + + /** + * Track training instances seen. + */ + protected long instancesSeen = 0L; + + /** + * Explicit no-arg constructor. + */ + public NaiveBayes() { + // Init the model + resetLearning(); + } + + /** + * Create an instance of this LocalLearner implementation. + */ + @Override + public LocalLearner create() { + return new NaiveBayes(); + } + + /** + * Predicts the class memberships for a given instance. If an instance is + * unclassified, the returned array elements will be all zero. + * + * Smoothing is being implemented by the AttributeClassObserver classes. At + * the moment, the GaussianNumericProbabilityAttributeClassObserver needs no + * smoothing as it processes continuous variables. + * + * Please note that we transform the scores to log space to avoid underflow, + * and we replace the multiplication with addition. + * + * The resulting scores are no longer probabilities, as a mixture of + * probability densities and probabilities can be used in the computation. + * + * @param inst + * the instance to be classified + * @return an array containing the estimated membership scores of the test + * instance in each class, in log space. + */ + @Override + public double[] getVotesForInstance(Instance inst) { + // Prepare the results array + double[] votes = new double[getNumberOfClasses()]; + // Over all classes + for (int classIndex = 0; classIndex < votes.length; classIndex++) { + // Get the prior for this class + votes[classIndex] = Math.log(getPrior(classIndex)); + // Iterate over the instance attributes + for (int index = 0; index < inst.numAttributes(); index++) { + int attributeID = inst.index(index); + // Skip class attribute + if (attributeID == inst.classIndex()) + continue; + Double value = inst.value(attributeID); + // Get the observer for the given attribute + GaussianNumericAttributeClassObserver obs = attributeObservers.get(attributeID); + // Init the estimator to null by default + GaussianEstimator estimator = null; + if (obs != null && obs.getEstimator(classIndex) != null) { + // Get the estimator + estimator = obs.getEstimator(classIndex); + } + double valueNonZero; + // The null case should be handled by smoothing! + if (estimator != null) { + // Get the score for a NON-ZERO attribute value + valueNonZero = estimator.probabilityDensity(value); + } + // We don't have an estimator + else { + // Assign a very small probability that we do see this value + valueNonZero = ADDITIVE_SMOOTHING_FACTOR; + } + votes[classIndex] += Math.log(valueNonZero); // - Math.log(valueZero); + } + // Check for null in the case of prequential evaluation + if (this.classPrototypes.get(classIndex) != null) { + // Add the prototype for the class, already in log space + votes[classIndex] += Math.log(this.classPrototypes.get(classIndex)); + } + } + return votes; + } + + /** + * Compute the prior for the given classIndex. + * + * Implemented by maximum likelihood at the moment. + * + * @param classIndex + * Id of the class for which we want to compute the prior. + * @return Prior probability for the requested class + */ + private double getPrior(int classIndex) { + // Maximum likelihood + Double currentCount = this.classInstances.get(classIndex); + if (currentCount == null || currentCount == 0) + return 0; + else + return currentCount * 1. / this.instancesSeen; + } + + /** + * Resets this classifier. It must be similar to starting a new classifier + * from scratch. + */ + @Override + public void resetLearning() { + // Reset priors + this.instancesSeen = 0L; + this.classInstances = new HashMap<>(); + this.classPrototypes = new HashMap<>(); + // Init the attribute observers + this.attributeObservers = new HashMap<>(); + } + + /** + * Trains this classifier incrementally using the given instance. + * + * @param inst + * the instance to be used for training + */ + @Override + public void trainOnInstance(Instance inst) { + // Update class statistics with weights + int classIndex = (int) inst.classValue(); + Double weight = this.classInstances.get(classIndex); + if (weight == null) + weight = 0.; + this.classInstances.put(classIndex, weight + inst.weight()); + + // Get the class prototype + Double classPrototype = this.classPrototypes.get(classIndex); + if (classPrototype == null) + classPrototype = 1.; + + // Iterate over the attributes of the given instance + for (int attributePosition = 0; attributePosition < inst + .numAttributes(); attributePosition++) { + // Get the attribute index - Dense -> 1:1, Sparse is remapped + int attributeID = inst.index(attributePosition); + // Skip class attribute + if (attributeID == inst.classIndex()) + continue; + // Get the attribute observer for the current attribute + GaussianNumericAttributeClassObserver obs = this.attributeObservers + .get(attributeID); + // Lazy init of observers, if null, instantiate a new one + if (obs == null) { + // FIXME: At this point, we model everything as a numeric + // attribute + obs = new GaussianNumericAttributeClassObserver(); + this.attributeObservers.put(attributeID, obs); + } + + // Get the probability density function under the current model + GaussianEstimator obs_estimator = obs.getEstimator(classIndex); + if (obs_estimator != null) { + // Fetch the probability that the feature value is zero + double probDens_zero_current = obs_estimator.probabilityDensity(0); + classPrototype -= probDens_zero_current; + } + + // FIXME: Sanity check on data values, for now just learn + // Learn attribute value for given class + obs.observeAttributeClass(inst.valueSparse(attributePosition), + (int) inst.classValue(), inst.weight()); + + // Update obs_estimator to fetch the pdf from the updated model + obs_estimator = obs.getEstimator(classIndex); + // Fetch the probability that the feature value is zero + double probDens_zero_updated = obs_estimator.probabilityDensity(0); + // Update the class prototype + classPrototype += probDens_zero_updated; + } + // Store the class prototype + this.classPrototypes.put(classIndex, classPrototype); + // Count another training instance + this.instancesSeen++; + } + + @Override + public void setDataset(Instances dataset) { + // Do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java new file mode 100644 index 0000000..a3fb89f --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java @@ -0,0 +1,150 @@ +package com.yahoo.labs.samoa.learners.classifiers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +/** + * License + */ +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.instances.InstancesHeader; +import com.yahoo.labs.samoa.moa.classifiers.functions.MajorityClass; + +/** + * + * Base class for adapting external classifiers. + * + */ +public class SimpleClassifierAdapter implements LocalLearner, Configurable { + + /** + * + */ + private static final long serialVersionUID = 4372366401338704353L; + + public ClassOption learnerOption = new ClassOption("learner", 'l', + "Classifier to train.", com.yahoo.labs.samoa.moa.classifiers.Classifier.class, MajorityClass.class.getName()); + /** + * The learner. + */ + protected com.yahoo.labs.samoa.moa.classifiers.Classifier learner; + + /** + * The is init. + */ + protected Boolean isInit; + + /** + * The dataset. + */ + protected Instances dataset; + + @Override + public void setDataset(Instances dataset) { + this.dataset = dataset; + } + + /** + * Instantiates a new learner. + * + * @param learner the learner + * @param dataset the dataset + */ + public SimpleClassifierAdapter(com.yahoo.labs.samoa.moa.classifiers.Classifier learner, Instances dataset) { + this.learner = learner.copy(); + this.isInit = false; + this.dataset = dataset; + } + + /** + * Instantiates a new learner. + * + */ + public SimpleClassifierAdapter() { + this.learner = ((com.yahoo.labs.samoa.moa.classifiers.Classifier) this.learnerOption.getValue()).copy(); + this.isInit = false; + } + + /** + * Creates a new learner object. + * + * @return the learner + */ + @Override + public SimpleClassifierAdapter create() { + SimpleClassifierAdapter l = new SimpleClassifierAdapter(learner, dataset); + if (dataset == null) { + System.out.println("dataset null while creating"); + } + return l; + } + + /** + * Trains this classifier incrementally using the given instance. + * + * @param inst the instance to be used for training + */ + @Override + public void trainOnInstance(Instance inst) { + if (!this.isInit) { + this.isInit = true; + InstancesHeader instances = new InstancesHeader(dataset); + this.learner.setModelContext(instances); + this.learner.prepareForUse(); + } + if (inst.weight() > 0) { + inst.setDataset(dataset); + learner.trainOnInstance(inst); + } + } + + /** + * Predicts the class memberships for a given instance. If an instance is + * unclassified, the returned array elements must be all zero. + * + * @param inst the instance to be classified + * @return an array containing the estimated membership probabilities of the + * test instance in each class + */ + @Override + public double[] getVotesForInstance(Instance inst) { + double[] ret; + inst.setDataset(dataset); + if (!this.isInit) { + ret = new double[dataset.numClasses()]; + } else { + ret = learner.getVotesForInstance(inst); + } + return ret; + } + + /** + * Resets this classifier. It must be similar to starting a new classifier + * from scratch. + * + */ + @Override + public void resetLearning() { + learner.resetLearning(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java new file mode 100644 index 0000000..affc935 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java @@ -0,0 +1,109 @@ +package com.yahoo.labs.samoa.learners.classifiers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.google.common.collect.ImmutableSet; +import java.util.Set; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.learners.AdaptiveLearner; +import com.yahoo.labs.samoa.learners.Learner; +import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.TopologyBuilder; +/** + * + * Classifier that contain a single classifier. + * + */ +public final class SingleClassifier implements Learner, AdaptiveLearner, Configurable { + + private static final long serialVersionUID = 684111382631697031L; + + private LocalLearnerProcessor learnerP; + + private Stream resultStream; + + private Instances dataset; + + public ClassOption learnerOption = new ClassOption("learner", 'l', + "Classifier to train.", LocalLearner.class, SimpleClassifierAdapter.class.getName()); + + private TopologyBuilder builder; + + private int parallelism; + + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism){ + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + + protected void setLayout() { + learnerP = new LocalLearnerProcessor(); + learnerP.setChangeDetector(this.getChangeDetector()); + LocalLearner learner = this.learnerOption.getValue(); + learner.setDataset(this.dataset); + learnerP.setLearner(learner); + + //learnerPI = this.builder.createPi(learnerP, 1); + this.builder.addProcessor(learnerP, parallelism); + resultStream = this.builder.createStream(learnerP); + + learnerP.setOutputStream(resultStream); + } + + @Override + public Processor getInputProcessor() { + return learnerP; + } + + /* (non-Javadoc) + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + return ImmutableSet.of(this.resultStream); + } + + protected ChangeDetector changeDetector; + + @Override + public ChangeDetector getChangeDetector() { + return this.changeDetector; + } + + @Override + public void setChangeDetector(ChangeDetector cd) { + this.changeDetector = cd; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java new file mode 100644 index 0000000..aba3d1d --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java @@ -0,0 +1,149 @@ +package com.yahoo.labs.samoa.learners.classifiers.ensemble; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.google.common.collect.ImmutableSet; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.learners.AdaptiveLearner; +import com.yahoo.labs.samoa.learners.Learner; +import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree; +import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ADWINChangeDetector; +import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.TopologyBuilder; + +/** + * The Bagging Classifier by Oza and Russell. + */ +public class AdaptiveBagging implements Learner, Configurable { + + /** Logger */ + private static final Logger logger = LoggerFactory.getLogger(AdaptiveBagging.class); + + /** The Constant serialVersionUID. */ + private static final long serialVersionUID = -2971850264864952099L; + + /** The base learner option. */ + public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', + "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); + + /** The ensemble size option. */ + public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', + "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); + + public ClassOption driftDetectionMethodOption = new ClassOption("driftDetectionMethod", 'd', + "Drift detection method to use.", ChangeDetector.class, ADWINChangeDetector.class.getName()); + + /** The distributor processor. */ + private BaggingDistributorProcessor distributorP; + + /** The result stream. */ + protected Stream resultStream; + + /** The dataset. */ + private Instances dataset; + + protected Learner classifier; + + protected int parallelism; + + /** + * Sets the layout. + */ + protected void setLayout() { + + int sizeEnsemble = this.ensembleSizeOption.getValue(); + + distributorP = new BaggingDistributorProcessor(); + distributorP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(distributorP, 1); + + //instantiate classifier + classifier = this.baseLearnerOption.getValue(); + if (classifier instanceof AdaptiveLearner) { + // logger.info("Building an AdaptiveLearner {}", classifier.getClass().getName()); + AdaptiveLearner ada = (AdaptiveLearner) classifier; + ada.setChangeDetector((ChangeDetector) this.driftDetectionMethodOption.getValue()); + } + classifier.init(builder, this.dataset, sizeEnsemble); + + PredictionCombinerProcessor predictionCombinerP= new PredictionCombinerProcessor(); + predictionCombinerP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(predictionCombinerP, 1); + + //Streams + resultStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setOutputStream(resultStream); + + for (Stream subResultStream:classifier.getResultStreams()) { + this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); + } + + /* The training stream. */ + Stream testingStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); + + /* The prediction stream. */ + Stream predictionStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); + + distributorP.setOutputStream(testingStream); + distributorP.setPredictionStream(predictionStream); + } + + /** The builder. */ + private TopologyBuilder builder; + + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism) { + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + @Override + public Processor getInputProcessor() { + return distributorP; + } + + /* (non-Javadoc) + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + return ImmutableSet.of(this.resultStream); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java new file mode 100644 index 0000000..9f99ff1 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java @@ -0,0 +1,138 @@ +package com.yahoo.labs.samoa.learners.classifiers.ensemble; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.google.common.collect.ImmutableSet; +import java.util.Set; + +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.learners.Learner; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.TopologyBuilder; +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree; + +/** + * The Bagging Classifier by Oza and Russell. + */ +public class Bagging implements Learner , Configurable { + + /** The Constant serialVersionUID. */ + private static final long serialVersionUID = -2971850264864952099L; + + /** The base learner option. */ + public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', + "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); + + + /** The ensemble size option. */ + public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', + "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); + + /** The distributor processor. */ + private BaggingDistributorProcessor distributorP; + + /** The training stream. */ + private Stream testingStream; + + /** The prediction stream. */ + private Stream predictionStream; + + /** The result stream. */ + protected Stream resultStream; + + /** The dataset. */ + private Instances dataset; + + protected Learner classifier; + + protected int parallelism; + + /** + * Sets the layout. + */ + protected void setLayout() { + + int sizeEnsemble = this.ensembleSizeOption.getValue(); + + distributorP = new BaggingDistributorProcessor(); + distributorP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(distributorP, 1); + + //instantiate classifier + classifier = (Learner) this.baseLearnerOption.getValue(); + classifier.init(builder, this.dataset, sizeEnsemble); + + PredictionCombinerProcessor predictionCombinerP= new PredictionCombinerProcessor(); + predictionCombinerP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(predictionCombinerP, 1); + + //Streams + resultStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setOutputStream(resultStream); + + for (Stream subResultStream:classifier.getResultStreams()) { + this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); + } + + testingStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); + + predictionStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); + + distributorP.setOutputStream(testingStream); + distributorP.setPredictionStream(predictionStream); + } + + /** The builder. */ + private TopologyBuilder builder; + + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism) { + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + @Override + public Processor getInputProcessor() { + return distributorP; + } + + /* (non-Javadoc) + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + Set<Stream> streams = ImmutableSet.of(this.resultStream); + return streams; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java new file mode 100644 index 0000000..65c782b --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java @@ -0,0 +1,201 @@ +package com.yahoo.labs.samoa.learners.classifiers.ensemble; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.learners.InstanceContentEvent; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.moa.core.MiscUtils; +import com.yahoo.labs.samoa.topology.Stream; +import java.util.Random; + +/** + * The Class BaggingDistributorPE. + */ +public class BaggingDistributorProcessor implements Processor{ + + /** + * + */ + private static final long serialVersionUID = -1550901409625192730L; + + /** The size ensemble. */ + private int sizeEnsemble; + + /** The training stream. */ + private Stream trainingStream; + + /** The prediction stream. */ + private Stream predictionStream; + + /** + * On event. + * + * @param event the event + * @return true, if successful + */ + public boolean process(ContentEvent event) { + InstanceContentEvent inEvent = (InstanceContentEvent) event; //((s4Event) event).getContentEvent(); + //InstanceEvent inEvent = (InstanceEvent) event; + + if (inEvent.getInstanceIndex() < 0) { + // End learning + predictionStream.put(event); + return false; + } + + + if (inEvent.isTesting()){ + Instance trainInst = inEvent.getInstance(); + for (int i = 0; i < sizeEnsemble; i++) { + Instance weightedInst = trainInst.copy(); + //weightedInst.setWeight(trainInst.weight() * k); + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + inEvent.getInstanceIndex(), weightedInst, false, true); + instanceContentEvent.setClassifierIndex(i); + instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + predictionStream.put(instanceContentEvent); + } + } + + /* Estimate model parameters using the training data. */ + if (inEvent.isTraining()) { + train(inEvent); + } + return false; + } + + /** The random. */ + protected Random random = new Random(); + + /** + * Train. + * + * @param inEvent the in event + */ + protected void train(InstanceContentEvent inEvent) { + Instance trainInst = inEvent.getInstance(); + for (int i = 0; i < sizeEnsemble; i++) { + int k = MiscUtils.poisson(1.0, this.random); + if (k > 0) { + Instance weightedInst = trainInst.copy(); + weightedInst.setWeight(trainInst.weight() * k); + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + inEvent.getInstanceIndex(), weightedInst, true, false); + instanceContentEvent.setClassifierIndex(i); + instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + trainingStream.put(instanceContentEvent); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.ProcessingElement#onCreate() + */ + @Override + public void onCreate(int id) { + //do nothing + } + + + /** + * Gets the training stream. + * + * @return the training stream + */ + public Stream getTrainingStream() { + return trainingStream; + } + + /** + * Sets the training stream. + * + * @param trainingStream the new training stream + */ + public void setOutputStream(Stream trainingStream) { + this.trainingStream = trainingStream; + } + + /** + * Gets the prediction stream. + * + * @return the prediction stream + */ + public Stream getPredictionStream() { + return predictionStream; + } + + /** + * Sets the prediction stream. + * + * @param predictionStream the new prediction stream + */ + public void setPredictionStream(Stream predictionStream) { + this.predictionStream = predictionStream; + } + + /** + * Gets the size ensemble. + * + * @return the size ensemble + */ + public int getSizeEnsemble() { + return sizeEnsemble; + } + + /** + * Sets the size ensemble. + * + * @param sizeEnsemble the new size ensemble + */ + public void setSizeEnsemble(int sizeEnsemble) { + this.sizeEnsemble = sizeEnsemble; + } + + + /* (non-Javadoc) + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { + BaggingDistributorProcessor newProcessor = new BaggingDistributorProcessor(); + BaggingDistributorProcessor originProcessor = (BaggingDistributorProcessor) sourceProcessor; + if (originProcessor.getPredictionStream() != null){ + newProcessor.setPredictionStream(originProcessor.getPredictionStream()); + } + if (originProcessor.getTrainingStream() != null){ + newProcessor.setOutputStream(originProcessor.getTrainingStream()); + } + newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble()); + /*if (originProcessor.getLearningCurve() != null){ + newProcessor.setLearningCurve((LearningCurve) originProcessor.getLearningCurve().copy()); + }*/ + return newProcessor; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java new file mode 100644 index 0000000..06723e2 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java @@ -0,0 +1,142 @@ +package com.yahoo.labs.samoa.learners.classifiers.ensemble; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.google.common.collect.ImmutableSet; +import java.util.Set; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.learners.Learner; +import com.yahoo.labs.samoa.learners.classifiers.SingleClassifier; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.TopologyBuilder; + +/** + * The Bagging Classifier by Oza and Russell. + */ +public class Boosting implements Learner , Configurable { + + /** The Constant serialVersionUID. */ + private static final long serialVersionUID = -2971850264864952099L; + + /** The base learner option. */ + public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', + "Classifier to train.", Learner.class, SingleClassifier.class.getName()); + + /** The ensemble size option. */ + public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', + "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); + + /** The distributor processor. */ + private BoostingDistributorProcessor distributorP; + + /** The result stream. */ + protected Stream resultStream; + + /** The dataset. */ + private Instances dataset; + + protected Learner classifier; + + protected int parallelism; + + /** + * Sets the layout. + */ + protected void setLayout() { + + int sizeEnsemble = this.ensembleSizeOption.getValue(); + + distributorP = new BoostingDistributorProcessor(); + distributorP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(distributorP, 1); + + //instantiate classifier + classifier = this.baseLearnerOption.getValue(); + classifier.init(builder, this.dataset, sizeEnsemble); + + BoostingPredictionCombinerProcessor predictionCombinerP= new BoostingPredictionCombinerProcessor(); + predictionCombinerP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(predictionCombinerP, 1); + + //Streams + resultStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setOutputStream(resultStream); + + for (Stream subResultStream:classifier.getResultStreams()) { + this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); + } + + /* The testing stream. */ + Stream testingStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); + + /* The prediction stream. */ + Stream predictionStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); + + distributorP.setOutputStream(testingStream); + distributorP.setPredictionStream(predictionStream); + + // Addition to Bagging: stream to train + /* The training stream. */ + Stream trainingStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setTrainingStream(trainingStream); + this.builder.connectInputKeyStream(trainingStream, classifier.getInputProcessor()); + + } + + /** The builder. */ + private TopologyBuilder builder; + + /* (non-Javadoc) + * @see samoa.classifiers.Classifier#init(samoa.engines.Engine, samoa.core.Stream, weka.core.Instances) + */ + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism) { + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + @Override + public Processor getInputProcessor() { + return distributorP; + } + + /* (non-Javadoc) + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + return ImmutableSet.of(this.resultStream); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java new file mode 100644 index 0000000..7100e7e --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java @@ -0,0 +1,36 @@ +package com.yahoo.labs.samoa.learners.classifiers.ensemble; + +import com.yahoo.labs.samoa.learners.InstanceContentEvent; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +/** + * The Class BoostingDistributorProcessor. + */ +public class BoostingDistributorProcessor extends BaggingDistributorProcessor{ + + @Override + protected void train(InstanceContentEvent inEvent) { + // Boosting is trained from the prediction combiner, not from the input + } + +}
