http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java deleted file mode 100644 index 6de0b87..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/PrequentialEvaluation.java +++ /dev/null @@ -1,220 +0,0 @@ -package com.yahoo.labs.samoa.tasks; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -import java.text.SimpleDateFormat; -import java.util.Date; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.ClassOption; -import com.github.javacliparser.Configurable; -import com.github.javacliparser.FileOption; -import com.github.javacliparser.IntOption; -import com.github.javacliparser.StringOption; -import com.yahoo.labs.samoa.evaluation.BasicClassificationPerformanceEvaluator; -import com.yahoo.labs.samoa.evaluation.BasicRegressionPerformanceEvaluator; -import com.yahoo.labs.samoa.evaluation.ClassificationPerformanceEvaluator; -import com.yahoo.labs.samoa.evaluation.PerformanceEvaluator; -import com.yahoo.labs.samoa.evaluation.EvaluatorProcessor; -import com.yahoo.labs.samoa.evaluation.RegressionPerformanceEvaluator; -import com.yahoo.labs.samoa.learners.ClassificationLearner; -import com.yahoo.labs.samoa.learners.Learner; -import com.yahoo.labs.samoa.learners.RegressionLearner; -import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator; -import com.yahoo.labs.samoa.streams.PrequentialSourceProcessor; -import com.yahoo.labs.samoa.topology.ComponentFactory; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; -import com.yahoo.labs.samoa.topology.TopologyBuilder; - -/** - * Prequential Evaluation task is a scheme in evaluating performance of online classifiers which uses each instance for - * testing online classifiers model and then it further uses the same instance for training the model(Test-then-train) - * - * @author Arinto Murdopo - * - */ -public class PrequentialEvaluation implements Task, Configurable { - - private static final long serialVersionUID = -8246537378371580550L; - - private static Logger logger = LoggerFactory.getLogger(PrequentialEvaluation.class); - - public ClassOption learnerOption = new ClassOption("learner", 'l', "Classifier to train.", Learner.class, - VerticalHoeffdingTree.class.getName()); - - public ClassOption streamTrainOption = new ClassOption("trainStream", 's', "Stream to learn from.", - InstanceStream.class, - RandomTreeGenerator.class.getName()); - - public ClassOption evaluatorOption = new ClassOption("evaluator", 'e', - "Classification performance evaluation method.", - PerformanceEvaluator.class, BasicClassificationPerformanceEvaluator.class.getName()); - - public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', - "Maximum number of instances to test/train on (-1 = no limit).", 1000000, -1, - Integer.MAX_VALUE); - - public IntOption timeLimitOption = new IntOption("timeLimit", 't', - "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, - Integer.MAX_VALUE); - - public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', - "How many instances between samples of the learning performance.", 100000, - 0, Integer.MAX_VALUE); - - public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", - "Prequential_" - + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); - - public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", - null, "csv", true); - - // Default=0: no delay/waiting - public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', - "How many microseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); - // Batch size to delay the incoming stream: delay of x milliseconds after each - // batch - public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b', - "The delay batch size: delay of x milliseconds after each batch ", 1, 1, Integer.MAX_VALUE); - - private PrequentialSourceProcessor preqSource; - - // private PrequentialSourceTopologyStarter preqStarter; - - // private EntranceProcessingItem sourcePi; - - private Stream sourcePiOutputStream; - - private Learner classifier; - - private EvaluatorProcessor evaluator; - - // private ProcessingItem evaluatorPi; - - // private Stream evaluatorPiInputStream; - - private Topology prequentialTopology; - - private TopologyBuilder builder; - - public void getDescription(StringBuilder sb, int indent) { - sb.append("Prequential evaluation"); - } - - @Override - public void init() { - // TODO remove the if statement - // theoretically, dynamic binding will work here! - // test later! - // for now, the if statement is used by Storm - - if (builder == null) { - builder = new TopologyBuilder(); - logger.debug("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - } - - // instantiate PrequentialSourceProcessor and its output stream - // (sourcePiOutputStream) - preqSource = new PrequentialSourceProcessor(); - preqSource.setStreamSource((InstanceStream) this.streamTrainOption.getValue()); - preqSource.setMaxNumInstances(instanceLimitOption.getValue()); - preqSource.setSourceDelay(sourceDelayOption.getValue()); - preqSource.setDelayBatchSize(batchDelayOption.getValue()); - builder.addEntranceProcessor(preqSource); - logger.debug("Successfully instantiating PrequentialSourceProcessor"); - - // preqStarter = new PrequentialSourceTopologyStarter(preqSource, - // instanceLimitOption.getValue()); - // sourcePi = builder.createEntrancePi(preqSource, preqStarter); - // sourcePiOutputStream = builder.createStream(sourcePi); - - sourcePiOutputStream = builder.createStream(preqSource); - // preqStarter.setInputStream(sourcePiOutputStream); - - // instantiate classifier and connect it to sourcePiOutputStream - classifier = this.learnerOption.getValue(); - classifier.init(builder, preqSource.getDataset(), 1); - builder.connectInputShuffleStream(sourcePiOutputStream, classifier.getInputProcessor()); - logger.debug("Successfully instantiating Classifier"); - - PerformanceEvaluator evaluatorOptionValue = this.evaluatorOption.getValue(); - if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, evaluatorOptionValue)) { - evaluatorOptionValue = getDefaultPerformanceEvaluatorForLearner(classifier); - } - evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue) - .samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build(); - - // evaluatorPi = builder.createPi(evaluator); - // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream); - builder.addProcessor(evaluator); - for (Stream evaluatorPiInputStream : classifier.getResultStreams()) { - builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); - } - - logger.debug("Successfully instantiating EvaluatorProcessor"); - - prequentialTopology = builder.build(); - logger.debug("Successfully building the topology"); - } - - @Override - public void setFactory(ComponentFactory factory) { - // TODO unify this code with init() - // for now, it's used by S4 App - // dynamic binding theoretically will solve this problem - builder = new TopologyBuilder(factory); - logger.debug("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - - } - - public Topology getTopology() { - return prequentialTopology; - } - - // - // @Override - // public TopologyStarter getTopologyStarter() { - // return this.preqStarter; - // } - - private static boolean isLearnerAndEvaluatorCompatible(Learner learner, PerformanceEvaluator evaluator) { - return (learner instanceof RegressionLearner && evaluator instanceof RegressionPerformanceEvaluator) || - (learner instanceof ClassificationLearner && evaluator instanceof ClassificationPerformanceEvaluator); - } - - private static PerformanceEvaluator getDefaultPerformanceEvaluatorForLearner(Learner learner) { - if (learner instanceof RegressionLearner) { - return new BasicRegressionPerformanceEvaluator(); - } - // Default to BasicClassificationPerformanceEvaluator for all other cases - return new BasicClassificationPerformanceEvaluator(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java deleted file mode 100644 index 52e3485..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/Task.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.yahoo.labs.samoa.tasks; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.topology.ComponentFactory; -import com.yahoo.labs.samoa.topology.Topology; - -/** - * Task interface, the mother of all SAMOA tasks! - */ -public interface Task { - - /** - * Initialize this SAMOA task, i.e. create and connect ProcessingItems and Streams and initialize the topology - */ - public void init(); - - /** - * Return the final topology object to be executed in the cluster - * - * @return topology object to be submitted to be executed in the cluster - */ - public Topology getTopology(); - - // /** - // * Return the entrance processor to start SAMOA topology - // * The logic to start the topology should be implemented here - // * @return entrance processor to start the topology - // */ - // public TopologyStarter getTopologyStarter(); - - /** - * Sets the factory. TODO: propose to hide factory from task, i.e. Task will only see TopologyBuilder, and factory - * creation will be handled by TopologyBuilder - * - * @param factory - * the new factory - */ - public void setFactory(ComponentFactory factory); - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java deleted file mode 100644 index 6f6069b..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractEntranceProcessingItem.java +++ /dev/null @@ -1,115 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.EntranceProcessor; - -/** - * Helper class for EntranceProcessingItem implementation. - * - * @author Anh Thu Vu - * - */ -public abstract class AbstractEntranceProcessingItem implements EntranceProcessingItem { - private EntranceProcessor processor; - private String name; - private Stream outputStream; - - /* - * Constructor - */ - public AbstractEntranceProcessingItem() { - this(null); - } - - public AbstractEntranceProcessingItem(EntranceProcessor processor) { - this.processor = processor; - } - - /* - * Processor - */ - /** - * Set the entrance processor for this EntranceProcessingItem - * - * @param processor - * the processor - */ - protected void setProcessor(EntranceProcessor processor) { - this.processor = processor; - } - - /** - * Get the EntranceProcessor of this EntranceProcessingItem. - * - * @return the EntranceProcessor - */ - public EntranceProcessor getProcessor() { - return this.processor; - } - - /* - * Name/ID - */ - /** - * Set the name (or ID) of this EntranceProcessingItem - * - * @param name - */ - public void setName(String name) { - this.name = name; - } - - /** - * Get the name (or ID) of this EntranceProcessingItem - * - * @return the name (or ID) - */ - public String getName() { - return this.name; - } - - /* - * Output Stream - */ - /** - * Set the output stream of this EntranceProcessingItem. An EntranceProcessingItem should have only 1 single output - * stream and should not be re-assigned. - * - * @return this EntranceProcessingItem - */ - public EntranceProcessingItem setOutputStream(Stream outputStream) { - if (this.outputStream != null && this.outputStream != outputStream) { - throw new IllegalStateException("Cannot overwrite output stream of EntranceProcessingItem"); - } else - this.outputStream = outputStream; - return this; - } - - /** - * Get the output stream of this EntranceProcessingItem. - * - * @return the output stream - */ - public Stream getOutputStream() { - return this.outputStream; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java deleted file mode 100644 index 58c4d7a..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractProcessingItem.java +++ /dev/null @@ -1,168 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.utils.PartitioningScheme; - -/** - * Abstract ProcessingItem - * - * Helper for implementation of ProcessingItem. It has basic information for a ProcessingItem: name, parallelismLevel - * and a processor. Subclass of this class needs to implement {@link #addInputStream(Stream, PartitioningScheme)}. - * - * @author Anh Thu Vu - * - */ -public abstract class AbstractProcessingItem implements ProcessingItem { - private String name; - private int parallelism; - private Processor processor; - - /* - * Constructor - */ - public AbstractProcessingItem() { - this(null); - } - - public AbstractProcessingItem(Processor processor) { - this(processor, 1); - } - - public AbstractProcessingItem(Processor processor, int parallelism) { - this.processor = processor; - this.parallelism = parallelism; - } - - /* - * Processor - */ - /** - * Set the processor for this ProcessingItem - * - * @param processor - * the processor - */ - protected void setProcessor(Processor processor) { - this.processor = processor; - } - - /** - * Get the processor of this ProcessingItem - * - * @return the processor - */ - public Processor getProcessor() { - return this.processor; - } - - /* - * Parallelism - */ - /** - * Set the parallelism factor of this ProcessingItem - * - * @param parallelism - */ - protected void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - /** - * Get the parallelism factor of this ProcessingItem - * - * @return the parallelism factor - */ - @Override - public int getParallelism() { - return this.parallelism; - } - - /* - * Name/ID - */ - /** - * Set the name (or ID) of this ProcessingItem - * - * @param name - * the name/ID - */ - public void setName(String name) { - this.name = name; - } - - /** - * Get the name (or ID) of this ProcessingItem - * - * @return the name/ID - */ - public String getName() { - return this.name; - } - - /* - * Add input streams - */ - /** - * Add an input stream to this ProcessingItem - * - * @param inputStream - * the input stream to add - * @param scheme - * partitioning scheme associated with this ProcessingItem and the input stream - * @return this ProcessingItem - */ - protected abstract ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme); - - /** - * Add an input stream to this ProcessingItem with SHUFFLE scheme - * - * @param inputStream - * the input stream - * @return this ProcessingItem - */ - public ProcessingItem connectInputShuffleStream(Stream inputStream) { - return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE); - } - - /** - * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme - * - * @param inputStream - * the input stream - * @return this ProcessingItem - */ - public ProcessingItem connectInputKeyStream(Stream inputStream) { - return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY); - } - - /** - * Add an input stream to this ProcessingItem with BROADCAST scheme - * - * @param inputStream - * the input stream - * @return this ProcessingItem - */ - public ProcessingItem connectInputAllStream(Stream inputStream) { - return this.addInputStream(inputStream, PartitioningScheme.BROADCAST); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java deleted file mode 100644 index cab7d3a..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractStream.java +++ /dev/null @@ -1,118 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * Abstract Stream - * - * Helper for implementation of Stream. It has basic information for a Stream: streamID and source ProcessingItem. - * Subclass of this class needs to implement {@link #put(ContentEvent)}. - * - * @author Anh Thu Vu - * - */ - -public abstract class AbstractStream implements Stream { - private String streamID; - private IProcessingItem sourcePi; - private int batchSize; - - /* - * Constructor - */ - public AbstractStream() { - this(null); - } - - public AbstractStream(IProcessingItem sourcePi) { - this.sourcePi = sourcePi; - this.batchSize = 1; - } - - /** - * Get source processing item of this stream - * - * @return - */ - public IProcessingItem getSourceProcessingItem() { - return this.sourcePi; - } - - /* - * Process event - */ - @Override - /** - * Send a ContentEvent - * @param event - * the ContentEvent to be sent - */ - public abstract void put(ContentEvent event); - - /* - * Stream name - */ - /** - * Get name (ID) of this stream - * - * @return the name (ID) - */ - @Override - public String getStreamId() { - return this.streamID; - } - - /** - * Set the name (ID) of this stream - * - * @param streamID - * the name (ID) - */ - public void setStreamId(String streamID) { - this.streamID = streamID; - } - - /* - * Batch size - */ - /** - * Set suggested batch size - * - * @param batchSize - * the suggested batch size - * - */ - @Override - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - /** - * Get suggested batch size - * - * @return the suggested batch size - */ - public int getBatchSize() { - return this.batchSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java deleted file mode 100644 index fd59f26..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/AbstractTopology.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.HashSet; -import java.util.Set; - -/** - * Topology abstract class. - * - * It manages basic information of a topology: name, sets of Streams and ProcessingItems - * - */ -public abstract class AbstractTopology implements Topology { - - private String topoName; - private Set<Stream> streams; - private Set<IProcessingItem> processingItems; - private Set<EntranceProcessingItem> entranceProcessingItems; - - protected AbstractTopology(String name) { - this.topoName = name; - this.streams = new HashSet<>(); - this.processingItems = new HashSet<>(); - this.entranceProcessingItems = new HashSet<>(); - } - - /** - * Gets the name of this topology - * - * @return name of the topology - */ - public String getTopologyName() { - return this.topoName; - } - - /** - * Sets the name of this topology - * - * @param topologyName - * name of the topology - */ - public void setTopologyName(String topologyName) { - this.topoName = topologyName; - } - - /** - * Adds an Entrance processing item to the topology. - * - * @param epi - * Entrance processing item - */ - public void addEntranceProcessingItem(EntranceProcessingItem epi) { - this.entranceProcessingItems.add(epi); - this.addProcessingItem(epi); - } - - /** - * Gets entrance processing items in the topology - * - * @return the set of processing items - */ - public Set<EntranceProcessingItem> getEntranceProcessingItems() { - return this.entranceProcessingItems; - } - - /** - * Add processing item to topology. - * - * @param procItem - * Processing item. - */ - public void addProcessingItem(IProcessingItem procItem) { - addProcessingItem(procItem, 1); - } - - /** - * Add processing item to topology. - * - * @param procItem - * Processing item. - * @param parallelismHint - * Processing item parallelism level. - */ - public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { - this.processingItems.add(procItem); - } - - /** - * Gets processing items in the topology (including entrance processing items) - * - * @return the set of processing items - */ - public Set<IProcessingItem> getProcessingItems() { - return this.processingItems; - } - - /** - * Add stream to topology. - * - * @param stream - */ - public void addStream(Stream stream) { - this.streams.add(stream); - } - - /** - * Gets streams in the topology - * - * @return the set of streams - */ - public Set<Stream> getStreams() { - return this.streams; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java deleted file mode 100644 index 0482972..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ComponentFactory.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; - -/** - * ComponentFactory interface. Provides platform specific components. - */ -public interface ComponentFactory { - - /** - * Creates a platform specific processing item with the specified processor. - * - * @param processor - * contains the logic for this processing item. - * @return ProcessingItem - */ - public ProcessingItem createPi(Processor processor); - - /** - * Creates a platform specific processing item with the specified processor. Additionally sets the parallelism level. - * - * @param processor - * contains the logic for this processing item. - * @param parallelism - * defines the amount of instances of this processing item will be created. - * @return ProcessingItem - */ - public ProcessingItem createPi(Processor processor, int parallelism); - - /** - * Creates a platform specific processing item with the specified processor that is the entrance point in the - * topology. This processing item can either generate a stream of data or connect to an external stream of data. - * - * @param entranceProcessor - * contains the logic for this processing item. - * @return EntranceProcessingItem - */ - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor); - - /** - * Creates a platform specific stream. - * - * @param sourcePi - * source processing item which will provide the events for this stream. - * @return Stream - */ - public Stream createStream(IProcessingItem sourcePi); - - /** - * Creates a platform specific topology. - * - * @param topoName - * Topology name. - * @return Topology - */ - public Topology createTopology(String topoName); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java deleted file mode 100644 index 04cde38..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/EntranceProcessingItem.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.EntranceProcessor; - -/** - * Entrance processing item interface. - */ -public interface EntranceProcessingItem extends IProcessingItem { - - @Override - /** - * Gets the processing item processor. - * - * @return the embedded EntranceProcessor. - */ - public EntranceProcessor getProcessor(); - - /** - * Set the single output stream for this EntranceProcessingItem. - * - * @param stream - * the stream - * @return the current instance of the EntranceProcessingItem for fluent interface. - */ - public EntranceProcessingItem setOutputStream(Stream stream); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java deleted file mode 100644 index b93612d..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/IProcessingItem.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.Processor; - -/** - * ProcessingItem interface specific for entrance processing items. - * - * @author severien - * - */ -public interface IProcessingItem { - - /** - * Gets the processing item processor. - * - * @return Processor - */ - public Processor getProcessor(); - - /** - * Sets processing item name. - * - * @param name - */ - // public void setName(String name); - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java deleted file mode 100644 index 253ba30..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ISubmitter.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.tasks.Task; - -/** - * Submitter interface for programatically deploying platform specific topologies. - * - * @author severien - * - */ -public interface ISubmitter { - - /** - * Deploy a specific task to a platform. - * - * @param task - */ - public void deployTask(Task task); - - /** - * Sets if the task should run locally or distributed. - * - * @param bool - */ - public void setLocal(boolean bool); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java deleted file mode 100644 index faeb0d3..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/LocalEntranceProcessingItem.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; - -/** - * Implementation of EntranceProcessingItem for local engines (Simple, Multithreads) - * - * @author Anh Thu Vu - * - */ -public class LocalEntranceProcessingItem extends AbstractEntranceProcessingItem { - public LocalEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - /** - * If there are available events, first event in the queue will be sent out on the output stream. - * - * @return true if there is (at least) one available event and it was sent out false otherwise - */ - public boolean injectNextEvent() { - if (this.getProcessor().hasNext()) { - ContentEvent event = this.getProcessor().nextEvent(); - this.getOutputStream().put(event); - return true; - } - return false; - } - - /** - * Start sending events by calling {@link #injectNextEvent()}. If there are no available events, and that the stream - * is not entirely consumed, it will wait by calling {@link #waitForNewEvents()} before attempting to send again. </p> - * When the stream is entirely consumed, the last event is tagged accordingly and the processor gets the finished - * status. - * - */ - public void startSendingEvents() { - if (this.getOutputStream() == null) - throw new IllegalStateException("Try sending events from EntrancePI while outputStream is not set."); - - while (!this.getProcessor().isFinished()) { - if (!this.injectNextEvent()) { - try { - waitForNewEvents(); - } catch (Exception e) { - e.printStackTrace(); - break; - } - } - } - } - - /** - * Method to wait for an amount of time when there are no available events. Implementation of EntranceProcessingItem - * should override this method to implement non-blocking wait or to adjust the amount of time. - */ - protected void waitForNewEvents() throws Exception { - Thread.sleep(100); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java deleted file mode 100644 index c1903b7..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/ProcessingItem.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Processing item interface. - * - * @author severien - * - */ -public interface ProcessingItem extends IProcessingItem { - - /** - * Connects this processing item in a round robin fashion. The events will be distributed evenly between the - * instantiated processing items. - * - * @param inputStream - * Stream to connect this processing item. - * @return ProcessingItem - */ - public ProcessingItem connectInputShuffleStream(Stream inputStream); - - /** - * Connects this processing item taking the event key into account. Events will be routed to the processing item - * according to the modulus of its key and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 = 1. - * Processing item responsible for 1 will receive this event. - * - * @param inputStream - * Stream to connect this processing item. - * @return ProcessingItem - */ - public ProcessingItem connectInputKeyStream(Stream inputStream); - - /** - * Connects this processing item to the stream in a broadcast fashion. All processing items of this type will receive - * copy of the original event. - * - * @param inputStream - * Stream to connect this processing item. - * @return ProcessingItem - */ - public ProcessingItem connectInputAllStream(Stream inputStream); - - /** - * Gets processing item parallelism level. - * - * @return int - */ - public int getParallelism(); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java deleted file mode 100644 index 49d4e7f..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Stream.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * Stream interface. - * - * @author severien - * - */ -public interface Stream { - - /** - * Puts events into a platform specific data stream. - * - * @param event - */ - public void put(ContentEvent event); - - /** - * Sets the stream id which is represented by a name. - * - * @param stream - */ - // public void setStreamId(String stream); - - /** - * Gets stream id. - * - * @return id - */ - public String getStreamId(); - - /** - * Set batch size - * - * @param batchSize - * the suggested size for batching messages on this stream - */ - public void setBatchSize(int batchsize); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java deleted file mode 100644 index 7084377..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/Topology.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -public interface Topology { - /* - * Name - */ - /** - * Get the topology's name - * - * @return the name of the topology - */ - public String getTopologyName(); - - /** - * Set the topology's name - * - * @param topologyName - * the name of the topology - */ - public void setTopologyName(String topologyName); - - /* - * Entrance Processing Items - */ - /** - * Add an EntranceProcessingItem to this topology - * - * @param epi - * the EntranceProcessingItem to be added - */ - void addEntranceProcessingItem(EntranceProcessingItem epi); - - /* - * Processing Items - */ - /** - * Add a ProcessingItem to this topology with default parallelism level (i.e. 1) - * - * @param procItem - * the ProcessingItem to be added - */ - void addProcessingItem(IProcessingItem procItem); - - /** - * Add a ProcessingItem to this topology with an associated parallelism level - * - * @param procItem - * the ProcessingItem to be added - * @param parallelismHint - * the parallelism level - */ - void addProcessingItem(IProcessingItem procItem, int parallelismHint); - - /* - * Streams - */ - /** - * - * @param stream - */ - void addStream(Stream stream); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java deleted file mode 100644 index dd747cf..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java +++ /dev/null @@ -1,227 +0,0 @@ -package com.yahoo.labs.samoa.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -import java.util.HashMap; -import java.util.Map; - -import com.google.common.base.Preconditions; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; - -/** - * Builder class that creates topology components and assemble them together. - * - */ -public class TopologyBuilder { - - // TODO: - // Possible options: - // 1. we may convert this as interface and platform dependent builder will - // inherit this method - // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology - // -ve -> fat class where it has capabilities to instantiate specific - // component and connecting them - // +ve -> easy abstraction for SAMOA developer - // "you just implement your builder logic here!" - private ComponentFactory componentFactory; - private Topology topology; - private Map<Processor, IProcessingItem> mapProcessorToProcessingItem; - - // TODO: refactor, temporary constructor used by Storm code - public TopologyBuilder() { - // TODO: initialize _componentFactory using dynamic binding - // for now, use StormComponentFactory - // should the factory be Singleton (?) - // ans: at the moment, no, i.e. each builder will has its associated - // factory! - // and the factory will be instantiated using dynamic binding - // this.componentFactory = new StormComponentFactory(); - } - - // TODO: refactor, temporary constructor used by S4 code - public TopologyBuilder(ComponentFactory theFactory) { - this.componentFactory = theFactory; - } - - /** - * Initiates topology with a specific name. - * - * @param topologyName - */ - public void initTopology(String topologyName) { - this.initTopology(topologyName, 0); - } - - /** - * Initiates topology with a specific name and a delay between consecutive instances. - * - * @param topologyName - * @param delay - * delay between injections of two instances from source (in milliseconds) - */ - public void initTopology(String topologyName, int delay) { - if (this.topology != null) { - // TODO: possible refactor this code later - System.out.println("Topology has been initialized before!"); - return; - } - this.topology = componentFactory.createTopology(topologyName); - } - - /** - * Returns the platform specific topology. - * - * @return - */ - public Topology build() { - return topology; - } - - public ProcessingItem addProcessor(Processor processor, int parallelism) { - ProcessingItem pi = createPi(processor, parallelism); - if (this.mapProcessorToProcessingItem == null) - this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); - this.mapProcessorToProcessingItem.put(processor, pi); - return pi; - } - - public ProcessingItem addProcessor(Processor processor) { - return addProcessor(processor, 1); - } - - public ProcessingItem connectInputShuffleStream(Stream inputStream, Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - return pi.connectInputShuffleStream(inputStream); - } - - public ProcessingItem connectInputKeyStream(Stream inputStream, Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - return pi.connectInputKeyStream(inputStream); - } - - public ProcessingItem connectInputAllStream(Stream inputStream, Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - return pi.connectInputAllStream(inputStream); - } - - public Stream createInputShuffleStream(Processor processor, Processor dest) { - Stream inputStream = this.createStream(dest); - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - pi.connectInputShuffleStream(inputStream); - return inputStream; - } - - public Stream createInputKeyStream(Processor processor, Processor dest) { - Stream inputStream = this.createStream(dest); - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - pi.connectInputKeyStream(inputStream); - return inputStream; - } - - public Stream createInputAllStream(Processor processor, Processor dest) { - Stream inputStream = this.createStream(dest); - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to connect to null PI"); - pi.connectInputAllStream(inputStream); - return inputStream; - } - - public Stream createStream(Processor processor) { - IProcessingItem pi = mapProcessorToProcessingItem.get(processor); - Stream ret = null; - Preconditions.checkNotNull(pi, "Trying to create stream from null PI"); - ret = this.createStream(pi); - if (pi instanceof EntranceProcessingItem) - ((EntranceProcessingItem) pi).setOutputStream(ret); - return ret; - } - - public EntranceProcessingItem addEntranceProcessor(EntranceProcessor entranceProcessor) { - EntranceProcessingItem pi = createEntrancePi(entranceProcessor); - if (this.mapProcessorToProcessingItem == null) - this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); - mapProcessorToProcessingItem.put(entranceProcessor, pi); - return pi; - } - - public ProcessingItem getProcessingItem(Processor processor) { - ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); - Preconditions.checkNotNull(pi, "Trying to retrieve null PI"); - return pi; - } - - /** - * Creates a processing item with a specific processor and paralellism level of 1. - * - * @param processor - * @return ProcessingItem - */ - @SuppressWarnings("unused") - private ProcessingItem createPi(Processor processor) { - return createPi(processor, 1); - } - - /** - * Creates a processing item with a specific processor and paralellism level. - * - * @param processor - * @param parallelism - * @return ProcessingItem - */ - private ProcessingItem createPi(Processor processor, int parallelism) { - ProcessingItem pi = this.componentFactory.createPi(processor, parallelism); - this.topology.addProcessingItem(pi, parallelism); - return pi; - } - - /** - * Creates a platform specific entrance processing item. - * - * @param processor - * @return - */ - private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { - EntranceProcessingItem epi = this.componentFactory.createEntrancePi(processor); - this.topology.addEntranceProcessingItem(epi); - if (this.mapProcessorToProcessingItem == null) - this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); - this.mapProcessorToProcessingItem.put(processor, epi); - return epi; - } - - /** - * Creates a platform specific stream. - * - * @param sourcePi - * source processing item. - * @return - */ - private Stream createStream(IProcessingItem sourcePi) { - Stream stream = this.componentFactory.createStream(sourcePi); - this.topology.addStream(stream); - return stream; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java deleted file mode 100644 index 61dc55c..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Represents the 3 schemes to partition the streams - * - * @author Anh Thu Vu - * - */ -public enum PartitioningScheme { - SHUFFLE, GROUP_BY_KEY, BROADCAST -} -// TODO: use this enum in S4 -// Storm doesn't seem to need this http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java deleted file mode 100644 index b5e3e0e..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.topology.IProcessingItem; - -/** - * Represents one destination for streams. It has the info of: the ProcessingItem, parallelismHint, and partitioning - * scheme. Usage: - When ProcessingItem connects to a stream, it will pass a StreamDestination to the stream. - Stream - * manages a set of StreamDestination. - Used in single-threaded and multi-threaded local mode. - * - * @author Anh Thu Vu - * - */ -public class StreamDestination { - private IProcessingItem pi; - private int parallelism; - private PartitioningScheme type; - - /* - * Constructor - */ - public StreamDestination(IProcessingItem pi, int parallelismHint, PartitioningScheme type) { - this.pi = pi; - this.parallelism = parallelismHint; - this.type = type; - } - - /* - * Getters - */ - public IProcessingItem getProcessingItem() { - return this.pi; - } - - public int getParallelism() { - return this.parallelism; - } - - public PartitioningScheme getPartitioningScheme() { - return this.type; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java deleted file mode 100644 index 7c9212e..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java +++ /dev/null @@ -1,183 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.jar.Attributes; -import java.util.jar.JarOutputStream; -import java.util.jar.Manifest; -import java.util.zip.ZipEntry; - -/** - * Utils class for building and deploying applications programmatically. - * - * @author severien - * - */ -public class Utils { - - public static void buildSamoaPackage() { - try { - String output = "/tmp/samoa/samoa.jar";// System.getProperty("user.home") + "/samoa.jar"; - Manifest manifest = createManifest(); - - BufferedOutputStream bo; - - bo = new BufferedOutputStream(new FileOutputStream(output)); - JarOutputStream jo = new JarOutputStream(bo, manifest); - - String baseDir = System.getProperty("user.dir"); - System.out.println(baseDir); - - File samoaJar = new File(baseDir + "/target/samoa-0.0.1-SNAPSHOT.jar"); - addEntry(jo, samoaJar, baseDir + "/target/", "/app/"); - addLibraries(jo); - - jo.close(); - bo.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - } - - // TODO should get the modules file from the parameters - public static void buildModulesPackage(List<String> modulesNames) { - System.out.println(System.getProperty("user.dir")); - try { - String baseDir = System.getProperty("user.dir"); - List<File> filesArray = new ArrayList<>(); - for (String module : modulesNames) { - module = "/" + module.replace(".", "/") + ".class"; - filesArray.add(new File(baseDir + module)); - } - String output = System.getProperty("user.home") + "/modules.jar"; - - Manifest manifest = new Manifest(); - manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, - "1.0"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, - "http://samoa.yahoo.com"); - manifest.getMainAttributes().put( - Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, - "Yahoo"); - manifest.getMainAttributes().put( - Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); - - BufferedOutputStream bo; - - bo = new BufferedOutputStream(new FileOutputStream(output)); - JarOutputStream jo = new JarOutputStream(bo, manifest); - - File[] files = filesArray.toArray(new File[filesArray.size()]); - addEntries(jo, files, baseDir, ""); - - jo.close(); - bo.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - } - - private static void addLibraries(JarOutputStream jo) { - try { - String baseDir = System.getProperty("user.dir"); - String libDir = baseDir + "/target/lib"; - File inputFile = new File(libDir); - - File[] files = inputFile.listFiles(); - for (File file : files) { - addEntry(jo, file, baseDir, "lib"); - } - jo.close(); - - } catch (IOException e) { - e.printStackTrace(); - } - } - - private static void addEntries(JarOutputStream jo, File[] files, String baseDir, String rootDir) { - for (File file : files) { - - if (!file.isDirectory()) { - addEntry(jo, file, baseDir, rootDir); - } else { - File dir = new File(file.getAbsolutePath()); - addEntries(jo, dir.listFiles(), baseDir, rootDir); - } - } - } - - private static void addEntry(JarOutputStream jo, File file, String baseDir, String rootDir) { - try { - BufferedInputStream bi = new BufferedInputStream(new FileInputStream(file)); - - String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir); - jo.putNextEntry(new ZipEntry(path)); - - byte[] buf = new byte[1024]; - int anz; - while ((anz = bi.read(buf)) != -1) { - jo.write(buf, 0, anz); - } - bi.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public static Manifest createManifest() { - Manifest manifest = new Manifest(); - manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, "http://samoa.yahoo.com"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, "Yahoo"); - manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); - Attributes s4Attributes = new Attributes(); - s4Attributes.putValue("S4-App-Class", "path.to.Class"); - Attributes.Name name = new Attributes.Name("S4-App-Class"); - Attributes.Name S4Version = new Attributes.Name("S4-Version"); - manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp"); - manifest.getMainAttributes().put(S4Version, "0.6.0-incubating"); - return manifest; - } - - public static Object getInstance(String className) { - Class<?> cls; - Object obj = null; - try { - cls = Class.forName(className); - obj = cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - e.printStackTrace(); - } - return obj; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java new file mode 100644 index 0000000..b523f48 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/core/ContentEvent.java @@ -0,0 +1,44 @@ +package org.apache.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * The Interface ContentEvent. + */ +public interface ContentEvent extends java.io.Serializable { + + /** + * Gets the content event key. + * + * @return the key + */ + public String getKey(); + + /** + * Sets the content event key. + * + * @param key + * string + */ + public void setKey(String key); + + public boolean isLastEvent(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java b/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java new file mode 100644 index 0000000..b079cfd --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/core/DoubleVector.java @@ -0,0 +1,119 @@ +package org.apache.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Arrays; + +import com.google.common.primitives.Doubles; + +public class DoubleVector implements java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = 8243012708860261398L; + + private double[] doubleArray; + + public DoubleVector() { + this.doubleArray = new double[0]; + } + + public DoubleVector(double[] toCopy) { + this.doubleArray = new double[toCopy.length]; + System.arraycopy(toCopy, 0, this.doubleArray, 0, toCopy.length); + } + + public DoubleVector(DoubleVector toCopy) { + this(toCopy.getArrayRef()); + } + + public double[] getArrayRef() { + return this.doubleArray; + } + + public double[] getArrayCopy() { + return Doubles.concat(this.doubleArray); + } + + public int numNonZeroEntries() { + int count = 0; + for (double element : this.doubleArray) { + if (Double.compare(element, 0.0) != 0) { + count++; + } + } + return count; + } + + public void setValue(int index, double value) { + if (index >= doubleArray.length) { + this.doubleArray = Doubles.ensureCapacity(this.doubleArray, index + 1, 0); + } + this.doubleArray[index] = value; + } + + public void addToValue(int index, double value) { + if (index >= doubleArray.length) { + this.doubleArray = Doubles.ensureCapacity(this.doubleArray, index + 1, 0); + } + this.doubleArray[index] += value; + } + + public double sumOfValues() { + double sum = 0.0; + for (double element : this.doubleArray) { + sum += element; + } + return sum; + } + + public void getSingleLineDescription(StringBuilder out) { + out.append("{"); + out.append(Doubles.join("|", this.doubleArray)); + out.append("}"); + } + + @Override + public String toString() { + return "DoubleVector [doubleArray=" + Arrays.toString(doubleArray) + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(doubleArray); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof DoubleVector)) + return false; + DoubleVector other = (DoubleVector) obj; + return Arrays.equals(doubleArray, other.doubleArray); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java new file mode 100644 index 0000000..d92e19b --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/core/EntranceProcessor.java @@ -0,0 +1,62 @@ +package org.apache.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; + +import com.github.javacliparser.Configurable; + +/** + * An EntranceProcessor is a specific kind of processor dedicated to providing events to inject in the topology. It can + * be connected to a single output stream. + */ +public interface EntranceProcessor extends Serializable, Configurable, Processor { + + /** + * Initializes the Processor. This method is called once after the topology is set up and before any call to the + * {@link nextTuple} method. + * + * @param the + * identifier of the processor. + */ + public void onCreate(int id); + + /** + * Checks whether the source stream is finished/exhausted. + */ + public boolean isFinished(); + + /** + * Checks whether a new event is ready to be processed. + * + * @return true if the EntranceProcessor is ready to provide the next event, false otherwise. + */ + public boolean hasNext(); + + /** + * Provides the next tuple to be processed by the topology. This method is the entry point for external events into + * the topology. + * + * @return the next event to be processed. + */ + public ContentEvent nextEvent(); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/Globals.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/core/Globals.java b/samoa-api/src/main/java/org/apache/samoa/core/Globals.java new file mode 100644 index 0000000..6402e73 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/core/Globals.java @@ -0,0 +1,59 @@ +package org.apache.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import com.github.javacliparser.StringUtils; + +/** + * Class for storing global information about current version of SAMOA. + * + * @author Albert Bifet + * @version $Revision: 7 $ + */ +public class Globals { + + public static final String workbenchTitle = "SAMOA: Scalable Advanced Massive Online Analysis Platform "; + + public static final String versionString = "0.0.1"; + + public static final String copyrightNotice = "Copyright Yahoo! Inc 2013"; + + public static final String webAddress = "http://github.com/yahoo/samoa"; + + public static String getWorkbenchInfoString() { + StringBuilder result = new StringBuilder(); + result.append(workbenchTitle); + StringUtils.appendNewline(result); + result.append("Version: "); + result.append(versionString); + StringUtils.appendNewline(result); + result.append("Copyright: "); + result.append(copyrightNotice); + StringUtils.appendNewline(result); + result.append("Web: "); + result.append(webAddress); + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/Processor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/core/Processor.java b/samoa-api/src/main/java/org/apache/samoa/core/Processor.java new file mode 100644 index 0000000..abed308 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/core/Processor.java @@ -0,0 +1,63 @@ +package org.apache.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; + +import com.github.javacliparser.Configurable; + +/** + * The Interface Processor. + */ +public interface Processor extends Serializable, Configurable { + + /** + * Entry point for the {@link Processor} code. This method is called once for every event received. + * + * @param event + * the event to be processed. + * @return true if successful, false otherwise. + */ + boolean process(ContentEvent event); + + /** + * Initializes the Processor. This method is called once after the topology is set up and before any call to the + * {@link process} method. + * + * @param id + * the identifier of the processor. + */ + void onCreate(int id); + + /** + * Creates a copy of a processor. This method is used to instantiate multiple instances of the same {@link Processsor} + * . + * + * @param processor + * the processor to be copied. + * + * @return a new instance of the {@link Processor}. + * */ + Processor newProcessor(Processor processor); // FIXME there should be no need + // for the processor as a + // parameter + // TODO can we substitute this with Cloneable? +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java b/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java new file mode 100644 index 0000000..92ef464 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/core/SerializableInstance.java @@ -0,0 +1,68 @@ +package org.apache.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; + +/** + * License + */ + +//import weka.core.DenseInstance; +//import weka.core.Instance; + +/** + * The Class SerializableInstance. This class is needed for serialization of kryo + */ +public class SerializableInstance extends DenseInstance { + + /** The Constant serialVersionUID. */ + private static final long serialVersionUID = -3659459626274566468L; + + /** + * Instantiates a new serializable instance. + */ + public SerializableInstance() { + super(0); + } + + /** + * Instantiates a new serializable instance. + * + * @param arg0 + * the arg0 + */ + public SerializableInstance(int arg0) { + super(arg0); + } + + /** + * Instantiates a new serializable instance. + * + * @param inst + * the inst + */ + public SerializableInstance(Instance inst) { + super(inst); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java new file mode 100644 index 0000000..24abe3e --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java @@ -0,0 +1,157 @@ +package org.apache.samoa.evaluation; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Utils; +import org.apache.samoa.moa.AbstractMOAObject; +import org.apache.samoa.moa.core.Measurement; + +/** + * Classification evaluator that performs basic incremental evaluation. + * + * @author Richard Kirkby ([email protected]) + * @author Albert Bifet (abifet at cs dot waikato dot ac dot nz) + * @version $Revision: 7 $ + */ +public class BasicClassificationPerformanceEvaluator extends AbstractMOAObject implements + ClassificationPerformanceEvaluator { + + private static final long serialVersionUID = 1L; + + protected double weightObserved; + + protected double weightCorrect; + + protected double[] columnKappa; + + protected double[] rowKappa; + + protected int numClasses; + + private double weightCorrectNoChangeClassifier; + + private int lastSeenClass; + + @Override + public void reset() { + reset(this.numClasses); + } + + public void reset(int numClasses) { + this.numClasses = numClasses; + this.rowKappa = new double[numClasses]; + this.columnKappa = new double[numClasses]; + for (int i = 0; i < this.numClasses; i++) { + this.rowKappa[i] = 0.0; + this.columnKappa[i] = 0.0; + } + this.weightObserved = 0.0; + this.weightCorrect = 0.0; + this.weightCorrectNoChangeClassifier = 0.0; + this.lastSeenClass = 0; + } + + @Override + public void addResult(Instance inst, double[] classVotes) { + double weight = inst.weight(); + int trueClass = (int) inst.classValue(); + if (weight > 0.0) { + if (this.weightObserved == 0) { + reset(inst.numClasses()); + } + this.weightObserved += weight; + int predictedClass = Utils.maxIndex(classVotes); + if (predictedClass == trueClass) { + this.weightCorrect += weight; + } + if (rowKappa.length > 0) { + this.rowKappa[predictedClass] += weight; + } + if (columnKappa.length > 0) { + this.columnKappa[trueClass] += weight; + } + } + if (this.lastSeenClass == trueClass) { + this.weightCorrectNoChangeClassifier += weight; + } + this.lastSeenClass = trueClass; + } + + @Override + public Measurement[] getPerformanceMeasurements() { + return new Measurement[] { + new Measurement("classified instances", + getTotalWeightObserved()), + new Measurement("classifications correct (percent)", + getFractionCorrectlyClassified() * 100.0), + new Measurement("Kappa Statistic (percent)", + getKappaStatistic() * 100.0), + new Measurement("Kappa Temporal Statistic (percent)", + getKappaTemporalStatistic() * 100.0) + }; + + } + + public double getTotalWeightObserved() { + return this.weightObserved; + } + + public double getFractionCorrectlyClassified() { + return this.weightObserved > 0.0 ? this.weightCorrect + / this.weightObserved : 0.0; + } + + public double getFractionIncorrectlyClassified() { + return 1.0 - getFractionCorrectlyClassified(); + } + + public double getKappaStatistic() { + if (this.weightObserved > 0.0) { + double p0 = getFractionCorrectlyClassified(); + double pc = 0.0; + for (int i = 0; i < this.numClasses; i++) { + pc += (this.rowKappa[i] / this.weightObserved) + * (this.columnKappa[i] / this.weightObserved); + } + return (p0 - pc) / (1.0 - pc); + } else { + return 0; + } + } + + public double getKappaTemporalStatistic() { + if (this.weightObserved > 0.0) { + double p0 = this.weightCorrect / this.weightObserved; + double pc = this.weightCorrectNoChangeClassifier / this.weightObserved; + + return (p0 - pc) / (1.0 - pc); + } else { + return 0; + } + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + Measurement.getMeasurementsDescription(getPerformanceMeasurements(), + sb, indent); + } +}
