http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java b/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java new file mode 100644 index 0000000..4673903 --- /dev/null +++ b/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java @@ -0,0 +1,83 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.assertEquals; +import mockit.Expectations; +import mockit.MockUp; +import mockit.Mocked; +import mockit.Tested; +import mockit.Verifications; + +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.impl.StormProcessingItem; +import org.apache.samoa.topology.impl.StormTopology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.TopologyBuilder; + +public class StormProcessingItemTest { + private static final int PARRALLELISM_HINT_2 = 2; + private static final int PARRALLELISM_HINT_4 = 4; + private static final String ID = "id"; + @Tested + private StormProcessingItem pi; + @Mocked + private Processor processor; + @Mocked + private StormTopology topology; + @Mocked + private TopologyBuilder stormBuilder = new TopologyBuilder(); + + @Before + public void setUp() { + pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2); + } + + @Test + public void testAddToTopology() { + new Expectations() { + { + topology.getStormBuilder(); + result = stormBuilder; + + stormBuilder.setBolt(ID, (IRichBolt) any, anyInt); + result = new MockUp<BoltDeclarer>() { + }.getMockInstance(); + } + }; + + pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint is ignored + + new Verifications() { + { + assertEquals(pi.getProcessor(), processor); + // TODO add methods to explore a topology and verify them + assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2); + assertEquals(pi.getId(), ID); + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-test/pom.xml b/samoa-test/pom.xml index 901c10e..8d33917 100644 --- a/samoa-test/pom.xml +++ b/samoa-test/pom.xml @@ -23,7 +23,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>samoa</artifactId> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <version>0.3.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -41,7 +41,7 @@ <version>2.4</version> </dependency> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-api</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java deleted file mode 100644 index 1cc0ba1..0000000 --- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java +++ /dev/null @@ -1,234 +0,0 @@ -package com.yahoo.labs.samoa; - -public class TestParams { - - /** - * templates that take the following parameters: - * <ul> - * <li>the output file location as an argument (-d), - * <li>the maximum number of instances for testing/training (-i) - * <li>the sampling size (-f) - * <li>the delay in ms between input instances (-w) , default is zero - * </ul> - * as well as the maximum number of instances for testing/training (-i) and the sampling size (-f) - */ - public static class Templates { - - public final static String PREQEVAL_VHT_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " - + "-l (com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " + - "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 10 -u 10)"; - - public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " - + "-l (classifiers.SingleClassifier -l com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " + - "-s (com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator -c 2)"; - - // setting the number of nominal attributes to zero significantly reduces - // the processing time, - // so that it's acceptable in a test case - public final static String PREQEVAL_BAGGING_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " - + "-l (com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " + - "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 0 -u 10)"; - - } - - public static final String EVALUATION_INSTANCES = "evaluation instances"; - public static final String CLASSIFIED_INSTANCES = "classified instances"; - public static final String CLASSIFICATIONS_CORRECT = "classifications correct (percent)"; - public static final String KAPPA_STAT = "Kappa Statistic (percent)"; - public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic (percent)"; - - private long inputInstances; - private long samplingSize; - private long evaluationInstances; - private long classifiedInstances; - private float classificationsCorrect; - private float kappaStat; - private float kappaTempStat; - private String cliStringTemplate; - private int pollTimeoutSeconds; - private final int prePollWait; - private int inputDelayMicroSec; - private String taskClassName; - - private TestParams(String taskClassName, - long inputInstances, - long samplingSize, - long evaluationInstances, - long classifiedInstances, - float classificationsCorrect, - float kappaStat, - float kappaTempStat, - String cliStringTemplate, - int pollTimeoutSeconds, - int prePollWait, - int inputDelayMicroSec) { - this.taskClassName = taskClassName; - this.inputInstances = inputInstances; - this.samplingSize = samplingSize; - this.evaluationInstances = evaluationInstances; - this.classifiedInstances = classifiedInstances; - this.classificationsCorrect = classificationsCorrect; - this.kappaStat = kappaStat; - this.kappaTempStat = kappaTempStat; - this.cliStringTemplate = cliStringTemplate; - this.pollTimeoutSeconds = pollTimeoutSeconds; - this.prePollWait = prePollWait; - this.inputDelayMicroSec = inputDelayMicroSec; - } - - public String getTaskClassName() { - return taskClassName; - } - - public long getInputInstances() { - return inputInstances; - } - - public long getSamplingSize() { - return samplingSize; - } - - public int getPollTimeoutSeconds() { - return pollTimeoutSeconds; - } - - public int getPrePollWaitSeconds() { - return prePollWait; - } - - public String getCliStringTemplate() { - return cliStringTemplate; - } - - public long getEvaluationInstances() { - return evaluationInstances; - } - - public long getClassifiedInstances() { - return classifiedInstances; - } - - public float getClassificationsCorrect() { - return classificationsCorrect; - } - - public float getKappaStat() { - return kappaStat; - } - - public float getKappaTempStat() { - return kappaTempStat; - } - - public int getInputDelayMicroSec() { - return inputDelayMicroSec; - } - - @Override - public String toString() { - return "TestParams{\n" + - "inputInstances=" + inputInstances + "\n" + - "samplingSize=" + samplingSize + "\n" + - "evaluationInstances=" + evaluationInstances + "\n" + - "classifiedInstances=" + classifiedInstances + "\n" + - "classificationsCorrect=" + classificationsCorrect + "\n" + - "kappaStat=" + kappaStat + "\n" + - "kappaTempStat=" + kappaTempStat + "\n" + - "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" + - "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" + - "prePollWait=" + prePollWait + "\n" + - "taskClassName='" + taskClassName + '\'' + "\n" + - "inputDelayMicroSec=" + inputDelayMicroSec + "\n" + - '}'; - } - - public static class Builder { - private long inputInstances; - private long samplingSize; - private long evaluationInstances; - private long classifiedInstances; - private float classificationsCorrect; - private float kappaStat = 0f; - private float kappaTempStat = 0f; - private String cliStringTemplate; - private int pollTimeoutSeconds = 10; - private int prePollWaitSeconds = 10; - private String taskClassName; - private int inputDelayMicroSec = 0; - - public Builder taskClassName(String taskClassName) { - this.taskClassName = taskClassName; - return this; - } - - public Builder inputInstances(long inputInstances) { - this.inputInstances = inputInstances; - return this; - } - - public Builder samplingSize(long samplingSize) { - this.samplingSize = samplingSize; - return this; - } - - public Builder evaluationInstances(long evaluationInstances) { - this.evaluationInstances = evaluationInstances; - return this; - } - - public Builder classifiedInstances(long classifiedInstances) { - this.classifiedInstances = classifiedInstances; - return this; - } - - public Builder classificationsCorrect(float classificationsCorrect) { - this.classificationsCorrect = classificationsCorrect; - return this; - } - - public Builder kappaStat(float kappaStat) { - this.kappaStat = kappaStat; - return this; - } - - public Builder kappaTempStat(float kappaTempStat) { - this.kappaTempStat = kappaTempStat; - return this; - } - - public Builder cliStringTemplate(String cliStringTemplate) { - this.cliStringTemplate = cliStringTemplate; - return this; - } - - public Builder resultFilePollTimeout(int pollTimeoutSeconds) { - this.pollTimeoutSeconds = pollTimeoutSeconds; - return this; - } - - public Builder inputDelayMicroSec(int inputDelayMicroSec) { - this.inputDelayMicroSec = inputDelayMicroSec; - return this; - } - - public Builder prePollWait(int prePollWaitSeconds) { - this.prePollWaitSeconds = prePollWaitSeconds; - return this; - } - - public TestParams build() { - return new TestParams(taskClassName, - inputInstances, - samplingSize, - evaluationInstances, - classifiedInstances, - classificationsCorrect, - kappaStat, - kappaTempStat, - cliStringTemplate, - pollTimeoutSeconds, - prePollWaitSeconds, - inputDelayMicroSec); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java deleted file mode 100644 index ae226c7..0000000 --- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java +++ /dev/null @@ -1,152 +0,0 @@ -package com.yahoo.labs.samoa;/* -* #%L -* SAMOA -* %% -* Copyright (C) 2013 - 2014 Yahoo! Inc. -* %% -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* #L% -*/ - -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVRecord; -import org.apache.commons.io.input.Tailer; -import org.apache.commons.io.input.TailerListenerAdapter; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.Reader; -import java.lang.reflect.InvocationTargetException; -import java.util.Iterator; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertTrue; - -public class TestUtils { - - private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class.getName()); - - public static void test(final TestParams testParams) throws IOException, ClassNotFoundException, - NoSuchMethodException, InvocationTargetException, IllegalAccessException, InterruptedException { - - final File tempFile = File.createTempFile("test", "test"); - - LOG.info("Starting test, output file is {}, test config is \n{}", tempFile.getAbsolutePath(), testParams.toString()); - - Executors.newSingleThreadExecutor().submit(new Callable<Void>() { - - @Override - public Void call() throws Exception { - try { - Class.forName(testParams.getTaskClassName()) - .getMethod("main", String[].class) - .invoke(null, (Object) String.format( - testParams.getCliStringTemplate(), - tempFile.getAbsolutePath(), - testParams.getInputInstances(), - testParams.getSamplingSize(), - testParams.getInputDelayMicroSec() - ).split("[ ]")); - } catch (Exception e) { - LOG.error("Cannot execute test {} {}", e.getMessage(), e.getCause().getMessage()); - } - return null; - } - }); - - Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds())); - - CountDownLatch signalComplete = new CountDownLatch(1); - - final Tailer tailer = Tailer.create(tempFile, new TestResultsTailerAdapter(signalComplete), 1000); - new Thread(new Runnable() { - @Override - public void run() { - tailer.run(); - } - }).start(); - - signalComplete.await(); - tailer.stop(); - - assertResults(tempFile, testParams); - } - - public static void assertResults(File outputFile, com.yahoo.labs.samoa.TestParams testParams) throws IOException { - - LOG.info("Checking results file " + outputFile.getAbsolutePath()); - // 1. parse result file with csv parser - Reader in = new FileReader(outputFile); - Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false) - .withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in); - CSVRecord last = null; - Iterator<CSVRecord> iterator = records.iterator(); - CSVRecord header = iterator.next(); - Assert.assertEquals("Invalid number of columns", 5, header.size()); - - Assert - .assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim()); - Assert - .assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim()); - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2) - .trim()); - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim()); - Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim()); - - // 2. check last line result - while (iterator.hasNext()) { - last = iterator.next(); - } - - assertTrue(String.format("Unmet threshold expected %d got %f", - testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))), - testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0))); - assertTrue(String.format("Unmet threshold expected %d got %f", testParams.getClassifiedInstances(), - Float.parseFloat(last.get(1))), - testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1))); - assertTrue(String.format("Unmet threshold expected %f got %f", - testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))), - testParams.getClassificationsCorrect() <= Float.parseFloat(last.get(2))); - assertTrue(String.format("Unmet threshold expected %f got %f", - testParams.getKappaStat(), Float.parseFloat(last.get(3))), - testParams.getKappaStat() <= Float.parseFloat(last.get(3))); - assertTrue(String.format("Unmet threshold expected %f got %f", - testParams.getKappaTempStat(), Float.parseFloat(last.get(4))), - testParams.getKappaTempStat() <= Float.parseFloat(last.get(4))); - - } - - private static class TestResultsTailerAdapter extends TailerListenerAdapter { - - private final CountDownLatch signalComplete; - - public TestResultsTailerAdapter(CountDownLatch signalComplete) { - this.signalComplete = signalComplete; - } - - @Override - public void handle(String line) { - if ("# COMPLETED".equals(line.trim())) { - signalComplete.countDown(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/org/apache/samoa/TestParams.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/org/apache/samoa/TestParams.java b/samoa-test/src/test/java/org/apache/samoa/TestParams.java new file mode 100644 index 0000000..e217922 --- /dev/null +++ b/samoa-test/src/test/java/org/apache/samoa/TestParams.java @@ -0,0 +1,234 @@ +package org.apache.samoa; + +public class TestParams { + + /** + * templates that take the following parameters: + * <ul> + * <li>the output file location as an argument (-d), + * <li>the maximum number of instances for testing/training (-i) + * <li>the sampling size (-f) + * <li>the delay in ms between input instances (-w) , default is zero + * </ul> + * as well as the maximum number of instances for testing/training (-i) and the sampling size (-f) + */ + public static class Templates { + + public final static String PREQEVAL_VHT_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (org.apache.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " + + "-s (org.apache.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 10 -u 10)"; + + public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (classifiers.SingleClassifier -l org.apache.samoa.learners.classifiers.NaiveBayes) " + + "-s (org.apache.samoa.moa.streams.generators.HyperplaneGenerator -c 2)"; + + // setting the number of nominal attributes to zero significantly reduces + // the processing time, + // so that it's acceptable in a test case + public final static String PREQEVAL_BAGGING_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (org.apache.samoa.learners.classifiers.ensemble.Bagging) " + + "-s (org.apache.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 0 -u 10)"; + + } + + public static final String EVALUATION_INSTANCES = "evaluation instances"; + public static final String CLASSIFIED_INSTANCES = "classified instances"; + public static final String CLASSIFICATIONS_CORRECT = "classifications correct (percent)"; + public static final String KAPPA_STAT = "Kappa Statistic (percent)"; + public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic (percent)"; + + private long inputInstances; + private long samplingSize; + private long evaluationInstances; + private long classifiedInstances; + private float classificationsCorrect; + private float kappaStat; + private float kappaTempStat; + private String cliStringTemplate; + private int pollTimeoutSeconds; + private final int prePollWait; + private int inputDelayMicroSec; + private String taskClassName; + + private TestParams(String taskClassName, + long inputInstances, + long samplingSize, + long evaluationInstances, + long classifiedInstances, + float classificationsCorrect, + float kappaStat, + float kappaTempStat, + String cliStringTemplate, + int pollTimeoutSeconds, + int prePollWait, + int inputDelayMicroSec) { + this.taskClassName = taskClassName; + this.inputInstances = inputInstances; + this.samplingSize = samplingSize; + this.evaluationInstances = evaluationInstances; + this.classifiedInstances = classifiedInstances; + this.classificationsCorrect = classificationsCorrect; + this.kappaStat = kappaStat; + this.kappaTempStat = kappaTempStat; + this.cliStringTemplate = cliStringTemplate; + this.pollTimeoutSeconds = pollTimeoutSeconds; + this.prePollWait = prePollWait; + this.inputDelayMicroSec = inputDelayMicroSec; + } + + public String getTaskClassName() { + return taskClassName; + } + + public long getInputInstances() { + return inputInstances; + } + + public long getSamplingSize() { + return samplingSize; + } + + public int getPollTimeoutSeconds() { + return pollTimeoutSeconds; + } + + public int getPrePollWaitSeconds() { + return prePollWait; + } + + public String getCliStringTemplate() { + return cliStringTemplate; + } + + public long getEvaluationInstances() { + return evaluationInstances; + } + + public long getClassifiedInstances() { + return classifiedInstances; + } + + public float getClassificationsCorrect() { + return classificationsCorrect; + } + + public float getKappaStat() { + return kappaStat; + } + + public float getKappaTempStat() { + return kappaTempStat; + } + + public int getInputDelayMicroSec() { + return inputDelayMicroSec; + } + + @Override + public String toString() { + return "TestParams{\n" + + "inputInstances=" + inputInstances + "\n" + + "samplingSize=" + samplingSize + "\n" + + "evaluationInstances=" + evaluationInstances + "\n" + + "classifiedInstances=" + classifiedInstances + "\n" + + "classificationsCorrect=" + classificationsCorrect + "\n" + + "kappaStat=" + kappaStat + "\n" + + "kappaTempStat=" + kappaTempStat + "\n" + + "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" + + "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" + + "prePollWait=" + prePollWait + "\n" + + "taskClassName='" + taskClassName + '\'' + "\n" + + "inputDelayMicroSec=" + inputDelayMicroSec + "\n" + + '}'; + } + + public static class Builder { + private long inputInstances; + private long samplingSize; + private long evaluationInstances; + private long classifiedInstances; + private float classificationsCorrect; + private float kappaStat = 0f; + private float kappaTempStat = 0f; + private String cliStringTemplate; + private int pollTimeoutSeconds = 10; + private int prePollWaitSeconds = 10; + private String taskClassName; + private int inputDelayMicroSec = 0; + + public Builder taskClassName(String taskClassName) { + this.taskClassName = taskClassName; + return this; + } + + public Builder inputInstances(long inputInstances) { + this.inputInstances = inputInstances; + return this; + } + + public Builder samplingSize(long samplingSize) { + this.samplingSize = samplingSize; + return this; + } + + public Builder evaluationInstances(long evaluationInstances) { + this.evaluationInstances = evaluationInstances; + return this; + } + + public Builder classifiedInstances(long classifiedInstances) { + this.classifiedInstances = classifiedInstances; + return this; + } + + public Builder classificationsCorrect(float classificationsCorrect) { + this.classificationsCorrect = classificationsCorrect; + return this; + } + + public Builder kappaStat(float kappaStat) { + this.kappaStat = kappaStat; + return this; + } + + public Builder kappaTempStat(float kappaTempStat) { + this.kappaTempStat = kappaTempStat; + return this; + } + + public Builder cliStringTemplate(String cliStringTemplate) { + this.cliStringTemplate = cliStringTemplate; + return this; + } + + public Builder resultFilePollTimeout(int pollTimeoutSeconds) { + this.pollTimeoutSeconds = pollTimeoutSeconds; + return this; + } + + public Builder inputDelayMicroSec(int inputDelayMicroSec) { + this.inputDelayMicroSec = inputDelayMicroSec; + return this; + } + + public Builder prePollWait(int prePollWaitSeconds) { + this.prePollWaitSeconds = prePollWaitSeconds; + return this; + } + + public TestParams build() { + return new TestParams(taskClassName, + inputInstances, + samplingSize, + evaluationInstances, + classifiedInstances, + classificationsCorrect, + kappaStat, + kappaTempStat, + cliStringTemplate, + pollTimeoutSeconds, + prePollWaitSeconds, + inputDelayMicroSec); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/org/apache/samoa/TestUtils.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/org/apache/samoa/TestUtils.java b/samoa-test/src/test/java/org/apache/samoa/TestUtils.java new file mode 100644 index 0000000..320b8a8 --- /dev/null +++ b/samoa-test/src/test/java/org/apache/samoa/TestUtils.java @@ -0,0 +1,152 @@ +package org.apache.samoa;/* +* #%L +* SAMOA +* %% +* Copyright (C) 2013 - 2014 Yahoo! Inc. +* %% +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* #L% +*/ + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.input.Tailer; +import org.apache.commons.io.input.TailerListenerAdapter; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class TestUtils { + + private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class.getName()); + + public static void test(final TestParams testParams) throws IOException, ClassNotFoundException, + NoSuchMethodException, InvocationTargetException, IllegalAccessException, InterruptedException { + + final File tempFile = File.createTempFile("test", "test"); + + LOG.info("Starting test, output file is {}, test config is \n{}", tempFile.getAbsolutePath(), testParams.toString()); + + Executors.newSingleThreadExecutor().submit(new Callable<Void>() { + + @Override + public Void call() throws Exception { + try { + Class.forName(testParams.getTaskClassName()) + .getMethod("main", String[].class) + .invoke(null, (Object) String.format( + testParams.getCliStringTemplate(), + tempFile.getAbsolutePath(), + testParams.getInputInstances(), + testParams.getSamplingSize(), + testParams.getInputDelayMicroSec() + ).split("[ ]")); + } catch (Exception e) { + LOG.error("Cannot execute test {} {}", e.getMessage(), e.getCause().getMessage()); + } + return null; + } + }); + + Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds())); + + CountDownLatch signalComplete = new CountDownLatch(1); + + final Tailer tailer = Tailer.create(tempFile, new TestResultsTailerAdapter(signalComplete), 1000); + new Thread(new Runnable() { + @Override + public void run() { + tailer.run(); + } + }).start(); + + signalComplete.await(); + tailer.stop(); + + assertResults(tempFile, testParams); + } + + public static void assertResults(File outputFile, org.apache.samoa.TestParams testParams) throws IOException { + + LOG.info("Checking results file " + outputFile.getAbsolutePath()); + // 1. parse result file with csv parser + Reader in = new FileReader(outputFile); + Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false) + .withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in); + CSVRecord last = null; + Iterator<CSVRecord> iterator = records.iterator(); + CSVRecord header = iterator.next(); + Assert.assertEquals("Invalid number of columns", 5, header.size()); + + Assert + .assertEquals("Unexpected column", org.apache.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim()); + Assert + .assertEquals("Unexpected column", org.apache.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim()); + Assert.assertEquals("Unexpected column", org.apache.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2) + .trim()); + Assert.assertEquals("Unexpected column", org.apache.samoa.TestParams.KAPPA_STAT, header.get(3).trim()); + Assert.assertEquals("Unexpected column", org.apache.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim()); + + // 2. check last line result + while (iterator.hasNext()) { + last = iterator.next(); + } + + assertTrue(String.format("Unmet threshold expected %d got %f", + testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))), + testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0))); + assertTrue(String.format("Unmet threshold expected %d got %f", testParams.getClassifiedInstances(), + Float.parseFloat(last.get(1))), + testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))), + testParams.getClassificationsCorrect() <= Float.parseFloat(last.get(2))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getKappaStat(), Float.parseFloat(last.get(3))), + testParams.getKappaStat() <= Float.parseFloat(last.get(3))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getKappaTempStat(), Float.parseFloat(last.get(4))), + testParams.getKappaTempStat() <= Float.parseFloat(last.get(4))); + + } + + private static class TestResultsTailerAdapter extends TailerListenerAdapter { + + private final CountDownLatch signalComplete; + + public TestResultsTailerAdapter(CountDownLatch signalComplete) { + this.signalComplete = signalComplete; + } + + @Override + public void handle(String line) { + if ("# COMPLETED".equals(line.trim())) { + signalComplete.countDown(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-threads/pom.xml b/samoa-threads/pom.xml index a7d9ab5..2634ef0 100644 --- a/samoa-threads/pom.xml +++ b/samoa-threads/pom.xml @@ -31,14 +31,14 @@ <artifactId>samoa-threads</artifactId> <parent> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa</artifactId> <version>0.3.0-SNAPSHOT</version> </parent> <dependencies> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-api</artifactId> <version>${project.version}</version> <exclusions> @@ -53,7 +53,7 @@ </exclusions> </dependency> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-test</artifactId> <type>test-jar</type> <classifier>test-jar-with-dependencies</classifier> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java deleted file mode 100644 index 2ac9ec1..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.yahoo.labs.samoa; - -import java.util.ArrayList; -import java.util.Arrays; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.ClassOption; -import com.yahoo.labs.samoa.tasks.Task; -import com.yahoo.labs.samoa.topology.impl.ThreadsComponentFactory; -import com.yahoo.labs.samoa.topology.impl.ThreadsEngine; - -/** - * @author Anh Thu Vu - * - */ -public class LocalThreadsDoTask { - private static final Logger logger = LoggerFactory.getLogger(LocalThreadsDoTask.class); - - /** - * The main method. - * - * @param args - * the arguments - */ - public static void main(String[] args) { - - ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - - // Get number of threads for multithreading mode - int numThreads = 1; - for (int i = 0; i < tmpArgs.size() - 1; i++) { - if (tmpArgs.get(i).equals("-t")) { - try { - numThreads = Integer.parseInt(tmpArgs.get(i + 1)); - tmpArgs.remove(i + 1); - tmpArgs.remove(i); - } catch (NumberFormatException e) { - System.err.println("Invalid number of threads."); - System.err.println(e.getStackTrace()); - } - } - } - logger.info("Number of threads:{}", numThreads); - - args = tmpArgs.toArray(new String[0]); - - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); - } - logger.debug("Command line string = {}", cliString.toString()); - System.out.println("Command line string = " + cliString.toString()); - - Task task = null; - try { - task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); - logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); - } catch (Exception e) { - logger.error("Fail to initialize the task", e); - System.out.println("Fail to initialize the task" + e); - return; - } - task.setFactory(new ThreadsComponentFactory()); - task.init(); - - ThreadsEngine.submitTopology(task.getTopology(), numThreads); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java deleted file mode 100644 index 91f213b..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.ComponentFactory; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; - -/** - * ComponentFactory for multithreaded engine - * - * @author Anh Thu Vu - * - */ -public class ThreadsComponentFactory implements ComponentFactory { - - @Override - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } - - @Override - public ProcessingItem createPi(Processor processor, int paralellism) { - return new ThreadsProcessingItem(processor, paralellism); - } - - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - return new ThreadsEntranceProcessingItem(entranceProcessor); - } - - @Override - public Stream createStream(IProcessingItem sourcePi) { - return new ThreadsStream(sourcePi); - } - - @Override - public Topology createTopology(String topoName) { - return new ThreadsTopology(topoName); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java deleted file mode 100644 index c266c09..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.topology.Topology; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** - * Multithreaded engine. - * - * @author Anh Thu Vu - * - */ -public class ThreadsEngine { - - private static final List<ExecutorService> threadPool = new ArrayList<ExecutorService>(); - - /* - * Create and manage threads - */ - public static void setNumberOfThreads(int numThreads) { - if (numThreads < 1) - throw new IllegalStateException("Number of threads must be a positive integer."); - - if (threadPool.size() > numThreads) - throw new IllegalStateException("You cannot set a numThreads smaller than the current size of the threads pool."); - - if (threadPool.size() < numThreads) { - for (int i = threadPool.size(); i < numThreads; i++) { - threadPool.add(Executors.newSingleThreadExecutor()); - } - } - } - - public static int getNumberOfThreads() { - return threadPool.size(); - } - - public static ExecutorService getThreadWithIndex(int index) { - if (threadPool.size() <= 0) - throw new IllegalStateException("Try to get ExecutorService from an empty pool."); - index %= threadPool.size(); - return threadPool.get(index); - } - - /* - * Submit topology and start - */ - private static void submitTopology(Topology topology) { - ThreadsTopology tTopology = (ThreadsTopology) topology; - tTopology.run(); - } - - public static void submitTopology(Topology topology, int numThreads) { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.submitTopology(topology); - } - - /* - * Stop - */ - public static void clearThreadPool() { - for (ExecutorService pool : threadPool) { - pool.shutdown(); - } - - for (ExecutorService pool : threadPool) { - try { - pool.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - threadPool.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java deleted file mode 100644 index 470c164..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem; - -/** - * EntranceProcessingItem for multithreaded engine. - * - * @author Anh Thu Vu - * - */ -public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem { - - public ThreadsEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - // The default waiting time when there is no available events is 100ms - // Override waitForNewEvents() to change it - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java deleted file mode 100644 index 5c4c9e9..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * Runnable class where each object corresponds to a ContentEvent and an assigned PI. When a PI receives a ContentEvent, - * it will create a ThreadsEventRunnable with the received ContentEvent and an assigned workerPI. This runnable is then - * submitted to a thread queue waiting to be executed. The worker PI will process the received event when the runnable - * object is executed/run. - * - * @author Anh Thu Vu - * - */ -public class ThreadsEventRunnable implements Runnable { - - private ThreadsProcessingItemInstance workerPi; - private ContentEvent event; - - public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, ContentEvent event) { - this.workerPi = workerPi; - this.event = event; - } - - public ThreadsProcessingItemInstance getWorkerProcessingItem() { - return this.workerPi; - } - - public ContentEvent getContentEvent() { - return this.event; - } - - @Override - public void run() { - try { - workerPi.processEvent(event); - } catch (Exception e) { - e.printStackTrace(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java deleted file mode 100644 index 1b83a05..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java +++ /dev/null @@ -1,103 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.ArrayList; -import java.util.List; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.AbstractProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * ProcessingItem for multithreaded engine. - * - * @author Anh Thu Vu - * - */ -public class ThreadsProcessingItem extends AbstractProcessingItem { - // Replicas of the ProcessingItem. - // When ProcessingItem receives an event, it assigns one - // of these replicas to process the event. - private List<ThreadsProcessingItemInstance> piInstances; - - // Each replica of ProcessingItem is assigned to one of the - // available threads in a round-robin fashion, i.e.: each - // replica is associated with the index of a thread. - // Each ProcessingItem has a random offset variable so that - // the allocation of PI replicas to threads are spread evenly - // among all threads. - private int offset; - - /* - * Constructor - */ - public ThreadsProcessingItem(Processor processor, int parallelismHint) { - super(processor, parallelismHint); - this.offset = (int) (Math.random() * ThreadsEngine.getNumberOfThreads()); - } - - public List<ThreadsProcessingItemInstance> getProcessingItemInstances() { - return this.piInstances; - } - - /* - * Connects to streams - */ - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); - ((ThreadsStream) inputStream).addDestination(destination); - return this; - } - - /* - * Process the received event. - */ - public void processEvent(ContentEvent event, int counter) { - if (this.piInstances == null || this.piInstances.size() < this.getParallelism()) - throw new IllegalStateException( - "ThreadsWorkerProcessingItem(s) need to be setup before process any event (i.e. in ThreadsTopology.start())."); - - ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter); - ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, event); - ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable); - } - - /* - * Setup the replicas of this PI. This should be called after the topology is - * set up (all Processors and PIs are setup and connected to the respective - * streams) and before events are sent. - */ - public void setupInstances() { - this.piInstances = new ArrayList<ThreadsProcessingItemInstance>(this.getParallelism()); - for (int i = 0; i < this.getParallelism(); i++) { - Processor newProcessor = this.getProcessor().newProcessor(this.getProcessor()); - newProcessor.onCreate(i + 1); - this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, this.offset + i)); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java deleted file mode 100644 index 930973b..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; - -/** - * Lightweight replicas of ThreadProcessingItem. ThreadsProcessingItem manages a list of these objects and assigns each - * incoming message to be processed by one of them. - * - * @author Anh Thu Vu - * - */ -public class ThreadsProcessingItemInstance { - - private Processor processor; - private int threadIndex; - - public ThreadsProcessingItemInstance(Processor processor, int threadIndex) { - this.processor = processor; - this.threadIndex = threadIndex; - } - - public int getThreadIndex() { - return this.threadIndex; - } - - public Processor getProcessor() { - return this.processor; - } - - public void processEvent(ContentEvent event) { - this.processor.process(event); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java deleted file mode 100644 index 2c02df7..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java +++ /dev/null @@ -1,109 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.AbstractStream; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * Stream for multithreaded engine. - * - * @author Anh Thu Vu - * - */ -public class ThreadsStream extends AbstractStream { - - private List<StreamDestination> destinations; - private int counter = 0; - private int maxCounter = 1; - - public ThreadsStream(IProcessingItem sourcePi) { - destinations = new LinkedList<StreamDestination>(); - } - - public void addDestination(StreamDestination destination) { - destinations.add(destination); - maxCounter *= destination.getParallelism(); - } - - public List<StreamDestination> getDestinations() { - return this.destinations; - } - - private int getNextCounter() { - if (maxCounter > 0 && counter >= maxCounter) - counter = 0; - this.counter++; - return this.counter; - } - - @Override - public synchronized void put(ContentEvent event) { - this.put(event, this.getNextCounter()); - } - - private void put(ContentEvent event, int counter) { - ThreadsProcessingItem pi; - int parallelism; - for (StreamDestination destination : destinations) { - pi = (ThreadsProcessingItem) destination.getProcessingItem(); - parallelism = destination.getParallelism(); - switch (destination.getPartitioningScheme()) { - case SHUFFLE: - pi.processEvent(event, counter % parallelism); - break; - case GROUP_BY_KEY: - pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism)); - break; - case BROADCAST: - for (int p = 0; p < parallelism; p++) { - pi.processEvent(event, p); - } - break; - } - } - } - - private static int getPIIndexForKey(String key, int parallelism) { - // If key is null, return a default index: 0 - if (key == null) - return 0; - - // HashCodeBuilder object does not have reset() method - // So all objects that get appended will be included in the - // computation of the hashcode. - // To avoid initialize a HashCodeBuilder for each event, - // here I use the static method with reflection on the event's key - int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism; - if (index < 0) { - index += parallelism; - } - return index; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java deleted file mode 100644 index 4ce5e2b..0000000 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.topology.AbstractTopology; -import com.yahoo.labs.samoa.topology.IProcessingItem; - -/** - * Topology for multithreaded engine. - * - * @author Anh Thu Vu - * - */ -public class ThreadsTopology extends AbstractTopology { - ThreadsTopology(String name) { - super(name); - } - - public void run() { - if (this.getEntranceProcessingItems() == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - if (this.getEntranceProcessingItems().size() != 1) - throw new IllegalStateException("ThreadsTopology supports 1 entrance PI only. Number of entrance PIs is " - + this.getEntranceProcessingItems().size()); - - this.setupProcessingItemInstances(); - ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) this.getEntranceProcessingItems() - .toArray()[0]; - if (entrancePi == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode - entrancePi.startSendingEvents(); - } - - /* - * Tell all the ThreadsProcessingItems to create & init their replicas - * (ThreadsProcessingItemInstance) - */ - private void setupProcessingItemInstances() { - for (IProcessingItem pi : this.getProcessingItems()) { - if (pi instanceof ThreadsProcessingItem) { - ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi; - tpi.setupInstances(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java b/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java new file mode 100644 index 0000000..3b5ca06 --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java @@ -0,0 +1,70 @@ +package org.apache.samoa; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.impl.ThreadsComponentFactory; +import org.apache.samoa.topology.impl.ThreadsEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; + +/** + * @author Anh Thu Vu + * + */ +public class LocalThreadsDoTask { + private static final Logger logger = LoggerFactory.getLogger(LocalThreadsDoTask.class); + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + // Get number of threads for multithreading mode + int numThreads = 1; + for (int i = 0; i < tmpArgs.size() - 1; i++) { + if (tmpArgs.get(i).equals("-t")) { + try { + numThreads = Integer.parseInt(tmpArgs.get(i + 1)); + tmpArgs.remove(i + 1); + tmpArgs.remove(i); + } catch (NumberFormatException e) { + System.err.println("Invalid number of threads."); + System.err.println(e.getStackTrace()); + } + } + } + logger.info("Number of threads:{}", numThreads); + + args = tmpArgs.toArray(new String[0]); + + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task = null; + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new ThreadsComponentFactory()); + task.init(); + + ThreadsEngine.submitTopology(task.getTopology(), numThreads); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java new file mode 100644 index 0000000..e27487b --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java @@ -0,0 +1,65 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +/** + * ComponentFactory for multithreaded engine + * + * @author Anh Thu Vu + * + */ +public class ThreadsComponentFactory implements ComponentFactory { + + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + @Override + public ProcessingItem createPi(Processor processor, int paralellism) { + return new ThreadsProcessingItem(processor, paralellism); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new ThreadsEntranceProcessingItem(entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + return new ThreadsStream(sourcePi); + } + + @Override + public Topology createTopology(String topoName) { + return new ThreadsTopology(topoName); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java new file mode 100644 index 0000000..5deb962 --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java @@ -0,0 +1,101 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.samoa.topology.Topology; + +/** + * Multithreaded engine. + * + * @author Anh Thu Vu + * + */ +public class ThreadsEngine { + + private static final List<ExecutorService> threadPool = new ArrayList<ExecutorService>(); + + /* + * Create and manage threads + */ + public static void setNumberOfThreads(int numThreads) { + if (numThreads < 1) + throw new IllegalStateException("Number of threads must be a positive integer."); + + if (threadPool.size() > numThreads) + throw new IllegalStateException("You cannot set a numThreads smaller than the current size of the threads pool."); + + if (threadPool.size() < numThreads) { + for (int i = threadPool.size(); i < numThreads; i++) { + threadPool.add(Executors.newSingleThreadExecutor()); + } + } + } + + public static int getNumberOfThreads() { + return threadPool.size(); + } + + public static ExecutorService getThreadWithIndex(int index) { + if (threadPool.size() <= 0) + throw new IllegalStateException("Try to get ExecutorService from an empty pool."); + index %= threadPool.size(); + return threadPool.get(index); + } + + /* + * Submit topology and start + */ + private static void submitTopology(Topology topology) { + ThreadsTopology tTopology = (ThreadsTopology) topology; + tTopology.run(); + } + + public static void submitTopology(Topology topology, int numThreads) { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.submitTopology(topology); + } + + /* + * Stop + */ + public static void clearThreadPool() { + for (ExecutorService pool : threadPool) { + pool.shutdown(); + } + + for (ExecutorService pool : threadPool) { + try { + pool.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + threadPool.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java new file mode 100644 index 0000000..6c09a60 --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java @@ -0,0 +1,41 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.LocalEntranceProcessingItem; + +/** + * EntranceProcessingItem for multithreaded engine. + * + * @author Anh Thu Vu + * + */ +public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem { + + public ThreadsEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java new file mode 100644 index 0000000..d812db8 --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java @@ -0,0 +1,61 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; + +/** + * Runnable class where each object corresponds to a ContentEvent and an assigned PI. When a PI receives a ContentEvent, + * it will create a ThreadsEventRunnable with the received ContentEvent and an assigned workerPI. This runnable is then + * submitted to a thread queue waiting to be executed. The worker PI will process the received event when the runnable + * object is executed/run. + * + * @author Anh Thu Vu + * + */ +public class ThreadsEventRunnable implements Runnable { + + private ThreadsProcessingItemInstance workerPi; + private ContentEvent event; + + public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, ContentEvent event) { + this.workerPi = workerPi; + this.event = event; + } + + public ThreadsProcessingItemInstance getWorkerProcessingItem() { + return this.workerPi; + } + + public ContentEvent getContentEvent() { + return this.event; + } + + @Override + public void run() { + try { + workerPi.processEvent(event); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java new file mode 100644 index 0000000..de24a8d --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java @@ -0,0 +1,103 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; + +/** + * ProcessingItem for multithreaded engine. + * + * @author Anh Thu Vu + * + */ +public class ThreadsProcessingItem extends AbstractProcessingItem { + // Replicas of the ProcessingItem. + // When ProcessingItem receives an event, it assigns one + // of these replicas to process the event. + private List<ThreadsProcessingItemInstance> piInstances; + + // Each replica of ProcessingItem is assigned to one of the + // available threads in a round-robin fashion, i.e.: each + // replica is associated with the index of a thread. + // Each ProcessingItem has a random offset variable so that + // the allocation of PI replicas to threads are spread evenly + // among all threads. + private int offset; + + /* + * Constructor + */ + public ThreadsProcessingItem(Processor processor, int parallelismHint) { + super(processor, parallelismHint); + this.offset = (int) (Math.random() * ThreadsEngine.getNumberOfThreads()); + } + + public List<ThreadsProcessingItemInstance> getProcessingItemInstances() { + return this.piInstances; + } + + /* + * Connects to streams + */ + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((ThreadsStream) inputStream).addDestination(destination); + return this; + } + + /* + * Process the received event. + */ + public void processEvent(ContentEvent event, int counter) { + if (this.piInstances == null || this.piInstances.size() < this.getParallelism()) + throw new IllegalStateException( + "ThreadsWorkerProcessingItem(s) need to be setup before process any event (i.e. in ThreadsTopology.start())."); + + ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter); + ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, event); + ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable); + } + + /* + * Setup the replicas of this PI. This should be called after the topology is + * set up (all Processors and PIs are setup and connected to the respective + * streams) and before events are sent. + */ + public void setupInstances() { + this.piInstances = new ArrayList<ThreadsProcessingItemInstance>(this.getParallelism()); + for (int i = 0; i < this.getParallelism(); i++) { + Processor newProcessor = this.getProcessor().newProcessor(this.getProcessor()); + newProcessor.onCreate(i + 1); + this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, this.offset + i)); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java new file mode 100644 index 0000000..a736cd1 --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java @@ -0,0 +1,54 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; + +/** + * Lightweight replicas of ThreadProcessingItem. ThreadsProcessingItem manages a list of these objects and assigns each + * incoming message to be processed by one of them. + * + * @author Anh Thu Vu + * + */ +public class ThreadsProcessingItemInstance { + + private Processor processor; + private int threadIndex; + + public ThreadsProcessingItemInstance(Processor processor, int threadIndex) { + this.processor = processor; + this.threadIndex = threadIndex; + } + + public int getThreadIndex() { + return this.threadIndex; + } + + public Processor getProcessor() { + return this.processor; + } + + public void processEvent(ContentEvent event) { + this.processor.process(event); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java new file mode 100644 index 0000000..d56ae2d --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java @@ -0,0 +1,108 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.StreamDestination; + +/** + * Stream for multithreaded engine. + * + * @author Anh Thu Vu + * + */ +public class ThreadsStream extends AbstractStream { + + private List<StreamDestination> destinations; + private int counter = 0; + private int maxCounter = 1; + + public ThreadsStream(IProcessingItem sourcePi) { + destinations = new LinkedList<StreamDestination>(); + } + + public void addDestination(StreamDestination destination) { + destinations.add(destination); + maxCounter *= destination.getParallelism(); + } + + public List<StreamDestination> getDestinations() { + return this.destinations; + } + + private int getNextCounter() { + if (maxCounter > 0 && counter >= maxCounter) + counter = 0; + this.counter++; + return this.counter; + } + + @Override + public synchronized void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + ThreadsProcessingItem pi; + int parallelism; + for (StreamDestination destination : destinations) { + pi = (ThreadsProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter % parallelism); + break; + case GROUP_BY_KEY: + pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism)); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); + } + break; + } + } + } + + private static int getPIIndexForKey(String key, int parallelism) { + // If key is null, return a default index: 0 + if (key == null) + return 0; + + // HashCodeBuilder object does not have reset() method + // So all objects that get appended will be included in the + // computation of the hashcode. + // To avoid initialize a HashCodeBuilder for each event, + // here I use the static method with reflection on the event's key + int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism; + if (index < 0) { + index += parallelism; + } + return index; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java new file mode 100644 index 0000000..4a7876a --- /dev/null +++ b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java @@ -0,0 +1,65 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; +import org.apache.samoa.topology.IProcessingItem; + +/** + * Topology for multithreaded engine. + * + * @author Anh Thu Vu + * + */ +public class ThreadsTopology extends AbstractTopology { + ThreadsTopology(String name) { + super(name); + } + + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("ThreadsTopology supports 1 entrance PI only. Number of entrance PIs is " + + this.getEntranceProcessingItems().size()); + + this.setupProcessingItemInstances(); + ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) this.getEntranceProcessingItems() + .toArray()[0]; + if (entrancePi == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode + entrancePi.startSendingEvents(); + } + + /* + * Tell all the ThreadsProcessingItems to create & init their replicas + * (ThreadsProcessingItemInstance) + */ + private void setupProcessingItemInstances() { + for (IProcessingItem pi : this.getProcessingItems()) { + if (pi instanceof ThreadsProcessingItem) { + ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi; + tpi.setupInstances(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java deleted file mode 100644 index c2789b9..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.yahoo.labs.samoa; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 - 2014 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.junit.Test; - -public class AlgosTest { - - @Test(timeout = 60000) - public void testVHTWithThreads() throws Exception { - - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(55f) - .kappaStat(-0.1f) - .kappaTempStat(-0.1f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 2") - .resultFilePollTimeout(10) - .prePollWait(10) - .taskClassName(LocalThreadsDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); - - } - - @Test(timeout = 180000) - public void testBaggingWithThreads() throws Exception { - TestParams baggingConfig = new TestParams.Builder() - .inputInstances(100_000) - .samplingSize(10_000) - .inputDelayMicroSec(100) // prevents saturating the system due to unbounded queues - .evaluationInstances(90_000) - .classifiedInstances(105_000) - .classificationsCorrect(55f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + " -t 2") - .prePollWait(10) - .resultFilePollTimeout(30) - .taskClassName(LocalThreadsDoTask.class.getName()) - .build(); - TestUtils.test(baggingConfig); - - } - -}
