SAMOA-48: Update InstancesContentEvent as a list of InstanceContent objects
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/b02882ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/b02882ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/b02882ef Branch: refs/heads/master Commit: b02882efe056b14ac9596e44e313485725b02353 Parents: 23169b0 Author: Albert Bifet <[email protected]> Authored: Fri Nov 6 12:25:12 2015 +0100 Committer: Albert Bifet <[email protected]> Committed: Fri Nov 6 12:25:12 2015 +0100 ---------------------------------------------------------------------- .../apache/samoa/learners/InstanceContent.java | 197 +++++++++++++++++++ .../samoa/learners/InstanceContentEvent.java | 61 +++--- .../samoa/learners/InstancesContentEvent.java | 110 ++--------- .../classifiers/trees/FilterProcessor.java | 8 +- .../trees/ModelAggregatorProcessor.java | 76 +++---- 5 files changed, 271 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b02882ef/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContent.java b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContent.java new file mode 100644 index 0000000..7a49985 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContent.java @@ -0,0 +1,197 @@ +package org.apache.samoa.learners; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import net.jcip.annotations.Immutable; +import org.apache.samoa.core.SerializableInstance; +import org.apache.samoa.instances.Instance; + +import java.io.Serializable; + +/** + * The Class InstanceContent. + */ +@Immutable +final public class InstanceContent implements Serializable { + + private static final long serialVersionUID = -8620668863064613841L; + + private long instanceIndex; + private int classifierIndex; + private int evaluationIndex; + private SerializableInstance instance; + private boolean isTraining; + private boolean isTesting; + private boolean isLast = false; + + public InstanceContent() { + + } + + /** + * Instantiates a new instance event. + * + * @param index + * the index + * @param instance + * the instance + * @param isTraining + * the is training + */ + public InstanceContent(long index, Instance instance, + boolean isTraining, boolean isTesting) { + if (instance != null) { + this.instance = new SerializableInstance(instance); + } + this.instanceIndex = index; + this.isTraining = isTraining; + this.isTesting = isTesting; + } + + /** + * Gets the single instance of InstanceEvent. + * + * @return the instance. + */ + public Instance getInstance() { + return instance; + } + + /** + * Gets the instance index. + * + * @return the index of the data vector. + */ + public long getInstanceIndex() { + return instanceIndex; + } + + /** + * Gets the class id. + * + * @return the true class of the vector. + */ + public int getClassId() { + // return classId; + return (int) instance.classValue(); + } + + /** + * Checks if is training. + * + * @return true if this is training data. + */ + public boolean isTraining() { + return isTraining; + } + + /** + * Set training flag. + * + * @param training + * flag. + */ + public void setTraining(boolean training) { + this.isTraining = training; + } + + /** + * Checks if is testing. + * + * @return true if this is testing data. + */ + public boolean isTesting() { + return isTesting; + } + + /** + * Set testing flag. + * + * @param testing + * flag. + */ + public void setTesting(boolean testing) { + this.isTesting = testing; + } + + /** + * Gets the classifier index. + * + * @return the classifier index + */ + public int getClassifierIndex() { + return classifierIndex; + } + + /** + * Sets the classifier index. + * + * @param classifierIndex + * the new classifier index + */ + public void setClassifierIndex(int classifierIndex) { + this.classifierIndex = classifierIndex; + } + + /** + * Gets the evaluation index. + * + * @return the evaluation index + */ + public int getEvaluationIndex() { + return evaluationIndex; + } + + /** + * Sets the evaluation index. + * + * @param evaluationIndex + * the new evaluation index + */ + public void setEvaluationIndex(int evaluationIndex) { + this.evaluationIndex = evaluationIndex; + } + + /** + * Sets the instance index. + * + * @param instanceIndex + * the new evaluation index + */ + public void setInstanceIndex(long instanceIndex) { + this.instanceIndex = instanceIndex; + } + + public boolean isLastEvent() { + return isLast; + } + + public void setLast(boolean isLast) { + this.isLast = isLast; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b02882ef/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java index 69a4428..c1a1de6 100644 --- a/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java +++ b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java @@ -30,10 +30,8 @@ import org.apache.samoa.instances.Instance; import net.jcip.annotations.Immutable; -//import weka.core.Instance; - /** - * The Class InstanceEvent. + * The Class InstanceContentEvent. */ @Immutable final public class InstanceContentEvent implements ContentEvent { @@ -42,13 +40,7 @@ final public class InstanceContentEvent implements ContentEvent { * */ private static final long serialVersionUID = -8620668863064613845L; - private long instanceIndex; - private int classifierIndex; - private int evaluationIndex; - private SerializableInstance instance; - private boolean isTraining; - private boolean isTesting; - private boolean isLast = false; + private InstanceContent instanceContent; public InstanceContentEvent() { @@ -66,12 +58,7 @@ final public class InstanceContentEvent implements ContentEvent { */ public InstanceContentEvent(long index, Instance instance, boolean isTraining, boolean isTesting) { - if (instance != null) { - this.instance = new SerializableInstance(instance); - } - this.instanceIndex = index; - this.isTraining = isTraining; - this.isTesting = isTesting; + this.instanceContent = new InstanceContent(index, instance, isTraining, isTesting); } /** @@ -80,7 +67,7 @@ final public class InstanceContentEvent implements ContentEvent { * @return the instance. */ public Instance getInstance() { - return instance; + return this.instanceContent.getInstance(); } /** @@ -89,7 +76,7 @@ final public class InstanceContentEvent implements ContentEvent { * @return the index of the data vector. */ public long getInstanceIndex() { - return instanceIndex; + return this.instanceContent.getInstanceIndex(); } /** @@ -97,9 +84,7 @@ final public class InstanceContentEvent implements ContentEvent { * * @return the true class of the vector. */ - public int getClassId() { - // return classId; - return (int) instance.classValue(); + public int getClassId() {return this.instanceContent.getClassId(); } /** @@ -108,7 +93,7 @@ final public class InstanceContentEvent implements ContentEvent { * @return true if this is training data. */ public boolean isTraining() { - return isTraining; + return this.instanceContent.isTraining(); } /** @@ -117,9 +102,8 @@ final public class InstanceContentEvent implements ContentEvent { * @param training * flag. */ - public void setTraining(boolean training) { - this.isTraining = training; - } + public void setTraining(boolean training) {this.instanceContent.setTraining(training);} + /** * Checks if is testing. @@ -127,7 +111,7 @@ final public class InstanceContentEvent implements ContentEvent { * @return true if this is testing data. */ public boolean isTesting() { - return isTesting; + return this.instanceContent.isTesting(); } /** @@ -137,7 +121,7 @@ final public class InstanceContentEvent implements ContentEvent { * flag. */ public void setTesting(boolean testing) { - this.isTesting = testing; + this.instanceContent.setTesting(testing); } /** @@ -146,7 +130,7 @@ final public class InstanceContentEvent implements ContentEvent { * @return the classifier index */ public int getClassifierIndex() { - return classifierIndex; + return this.instanceContent.getClassifierIndex(); } /** @@ -156,7 +140,7 @@ final public class InstanceContentEvent implements ContentEvent { * the new classifier index */ public void setClassifierIndex(int classifierIndex) { - this.classifierIndex = classifierIndex; + this.instanceContent.setClassifierIndex(classifierIndex); } /** @@ -165,7 +149,7 @@ final public class InstanceContentEvent implements ContentEvent { * @return the evaluation index */ public int getEvaluationIndex() { - return evaluationIndex; + return this.instanceContent.getEvaluationIndex(); } /** @@ -175,7 +159,7 @@ final public class InstanceContentEvent implements ContentEvent { * the new evaluation index */ public void setEvaluationIndex(int evaluationIndex) { - this.evaluationIndex = evaluationIndex; + this.instanceContent.setEvaluationIndex(evaluationIndex); } /* @@ -200,16 +184,23 @@ final public class InstanceContentEvent implements ContentEvent { @Override public void setKey(String str) { - this.instanceIndex = Long.parseLong(str); + this.instanceContent.setInstanceIndex(Long.parseLong(str)); } @Override public boolean isLastEvent() { - return isLast; + return this.instanceContent.isLastEvent(); } public void setLast(boolean isLast) { - this.isLast = isLast; + this.instanceContent.setLast(isLast); + } + /** + * Gets the Instance Content. + * + * @return the instance content + */ + public InstanceContent getInstanceContent() { + return instanceContent; } - } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b02882ef/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java index 2bab8a6..5a0f0a2 100644 --- a/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java +++ b/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java @@ -30,10 +30,6 @@ import java.util.LinkedList; import java.util.List; import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.SerializableInstance; -import org.apache.samoa.instances.Instance; - -//import weka.core.Instance; /** * The Class InstanceEvent. @@ -45,49 +41,25 @@ final public class InstancesContentEvent implements ContentEvent { * */ private static final long serialVersionUID = -8620668863064613845L; - private long instanceIndex; - private int classifierIndex; - private int evaluationIndex; - // private SerializableInstance instance; - private boolean isTraining; - private boolean isTesting; - private boolean isLast = false; + + protected List<InstanceContent> instanceList = new LinkedList<InstanceContent>(); public InstancesContentEvent() { } /** - * Instantiates a new instance event. - * - * @param index - * the index - * @param instance - * the instance - * @param isTraining - * the is training + * Instantiates a new event with a list of InstanceContent. + * */ - public InstancesContentEvent(long index,// Instance instance, - boolean isTraining, boolean isTesting) { - /* - * if (instance != null) { this.instance = new - * SerializableInstance(instance); } - */ - this.instanceIndex = index; - this.isTraining = isTraining; - this.isTesting = isTesting; - } public InstancesContentEvent(InstanceContentEvent event) { - this.instanceIndex = event.getInstanceIndex(); - this.isTraining = event.isTraining(); - this.isTesting = event.isTesting(); + this.add(event.getInstanceContent()); } - protected List<Instance> instanceList = new LinkedList<Instance>(); - public void add(Instance instance) { - instanceList.add(new SerializableInstance(instance)); + public void add(InstanceContent instance) { + instanceList.add(instance); } /** @@ -95,74 +67,29 @@ final public class InstancesContentEvent implements ContentEvent { * * @return the instance. */ - public Instance[] getInstances() { - return instanceList.toArray(new Instance[instanceList.size()]); + public InstanceContent[] getInstances() { + return instanceList.toArray(new InstanceContent[instanceList.size()]); } - /** - * Gets the instance index. - * - * @return the index of the data vector. - */ - public long getInstanceIndex() { - return instanceIndex; - } - - /** - * Checks if is training. - * - * @return true if this is training data. - */ - public boolean isTraining() { - return isTraining; - } - - /** - * Checks if is testing. - * - * @return true if this is testing data. - */ - public boolean isTesting() { - return isTesting; - } /** * Gets the classifier index. - * + * * @return the classifier index */ public int getClassifierIndex() { - return classifierIndex; - } - - /** - * Sets the classifier index. - * - * @param classifierIndex - * the new classifier index - */ - public void setClassifierIndex(int classifierIndex) { - this.classifierIndex = classifierIndex; + return this.instanceList.get(0).getClassifierIndex(); } /** * Gets the evaluation index. - * + * * @return the evaluation index */ public int getEvaluationIndex() { - return evaluationIndex; + return this.instanceList.get(0).getEvaluationIndex(); } - /** - * Sets the evaluation index. - * - * @param evaluationIndex - * the new evaluation index - */ - public void setEvaluationIndex(int evaluationIndex) { - this.evaluationIndex = evaluationIndex; - } /* * (non-Javadoc) @@ -185,17 +112,16 @@ final public class InstancesContentEvent implements ContentEvent { } @Override - public void setKey(String str) { - this.instanceIndex = Long.parseLong(str); + public void setKey(String key) { + //No needed } @Override public boolean isLastEvent() { - return isLast; + return this.instanceList.get(this.instanceList.size()-1).isLastEvent(); } - public void setLast(boolean isLast) { - this.isLast = isLast; + public List<InstanceContent> getList() { + return this.instanceList; } - } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b02882ef/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java index 0af8b93..a55b813 100644 --- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java @@ -79,16 +79,10 @@ final class FilterProcessor implements Processor { if (this.waitingInstances == this.batchSize || instanceContentEvent.isLastEvent()) { // Send Instances InstancesContentEvent outputEvent = new InstancesContentEvent(instanceContentEvent); - boolean isLastEvent = false; while (!this.contentEventList.isEmpty()) { InstanceContentEvent ice = this.contentEventList.remove(0); - Instance inst = ice.getInstance(); - outputEvent.add(inst); - if (!isLastEvent) { - isLastEvent = ice.isLastEvent(); - } + outputEvent.add(ice.getInstanceContent()); } - outputEvent.setLast(isLastEvent); this.waitingInstances = 0; this.outputStream.put(outputEvent); if (this.delay > 0) { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b02882ef/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java index 846d8e1..ce2d4c4 100644 --- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java @@ -38,6 +38,7 @@ import org.apache.samoa.core.Processor; import org.apache.samoa.instances.Instance; import org.apache.samoa.instances.Instances; import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.learners.InstanceContent; import org.apache.samoa.learners.InstanceContentEvent; import org.apache.samoa.learners.InstancesContentEvent; import org.apache.samoa.learners.ResultContentEvent; @@ -250,7 +251,7 @@ final class ModelAggregatorProcessor implements Processor { * The associated instance content event * @return ResultContentEvent to be sent into Evaluator PI or other destination PI. */ - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { + private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContent inEvent) { ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), inEvent.getClassId(), prediction, inEvent.isLastEvent()); rce.setClassifierIndex(this.processorId); @@ -258,19 +259,6 @@ final class ModelAggregatorProcessor implements Processor { return rce; } - private ResultContentEvent newResultContentEvent(double[] prediction, Instance inst, InstancesContentEvent inEvent) { - boolean isLastEvent = false; - if (inEvent.isLastEvent()) { - Instance[] tmp = inEvent.getInstances(); - isLastEvent = inst == tmp[tmp.length - 1]; // only set LastEvent on the last instance in the mini-batch - } - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(), - prediction, isLastEvent); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - private List<InstancesContentEvent> contentEventList = new LinkedList<>(); /** @@ -297,40 +285,34 @@ final class ModelAggregatorProcessor implements Processor { private int numBatches = 0; private void processInstances(InstancesContentEvent instContentEvent) { + for (InstanceContent instContent : instContentEvent.getList()) { + Instance inst = instContent.getInstance(); + boolean isTesting = instContent.isTesting(); + boolean isTraining = instContent.isTraining(); + inst.setDataset(this.dataset); + // Check the instance whether it is used for testing or training + // boolean testAndTrain = isTraining; //Train after testing + double[] prediction = null; + if (isTesting) { + prediction = getVotesForInstance(inst, false); + this.resultStream.put(newResultContentEvent(prediction, instContent)); + } - Instance[] instances = instContentEvent.getInstances(); - boolean isTesting = instContentEvent.isTesting(); - boolean isTraining = instContentEvent.isTraining(); - for (Instance inst : instances) { - this.processInstance(inst, instContentEvent, isTesting, isTraining); - } - } - - private void processInstance(Instance inst, InstancesContentEvent instContentEvent, boolean isTesting, - boolean isTraining) { - inst.setDataset(this.dataset); - // Check the instance whether it is used for testing or training - // boolean testAndTrain = isTraining; //Train after testing - double[] prediction = null; - if (isTesting) { - prediction = getVotesForInstance(inst, false); - this.resultStream.put(newResultContentEvent(prediction, inst, instContentEvent)); - } - - if (isTraining) { - trainOnInstanceImpl(inst); - if (this.changeDetector != null) { - if (prediction == null) { - prediction = getVotesForInstance(inst); - } - boolean correctlyClassifies = this.correctlyClassifies(inst, prediction); - double oldEstimation = this.changeDetector.getEstimation(); - this.changeDetector.input(correctlyClassifies ? 0 : 1); - if (this.changeDetector.getEstimation() > oldEstimation) { - // Start a new classifier - logger.info("Change detected, resetting the classifier"); - this.resetLearning(); - this.changeDetector.resetLearning(); + if (isTraining) { + trainOnInstanceImpl(inst); + if (this.changeDetector != null) { + if (prediction == null) { + prediction = getVotesForInstance(inst); + } + boolean correctlyClassifies = this.correctlyClassifies(inst, prediction); + double oldEstimation = this.changeDetector.getEstimation(); + this.changeDetector.input(correctlyClassifies ? 0 : 1); + if (this.changeDetector.getEstimation() > oldEstimation) { + // Start a new classifier + logger.info("Change detected, resetting the classifier"); + this.resetLearning(); + this.changeDetector.resetLearning(); + } } } }
