http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java new file mode 100644 index 0000000..7c4769e --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java @@ -0,0 +1,108 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.ClassOption; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.tasks.Task; + +/** + * Utility class for samoa-storm project. It is used by StormDoTask to process its arguments. + * @author Arinto Murdopo + * + */ +public class StormSamoaUtils { + + private static final Logger logger = LoggerFactory.getLogger(StormSamoaUtils.class); + + static final String KEY_FIELD = "key"; + static final String CONTENT_EVENT_FIELD = "content_event"; + + static Properties getProperties() throws IOException{ + Properties props = new Properties(); + InputStream is; + + File f = new File("src/main/resources/samoa-storm-cluster.properties"); // FIXME it does not exist anymore + is = new FileInputStream(f); + + try { + props.load(is); + } catch (IOException e1) { + System.out.println("Fail to load property file"); + return null; + } finally{ + is.close(); + } + + return props; + } + + public static StormTopology argsToTopology(String[] args){ + StringBuilder cliString = new StringBuilder(); + for (String arg : args) { + cliString.append(" ").append(arg); + } + logger.debug("Command line string = {}", cliString.toString()); + + Task task = getTask(cliString.toString()); + + //TODO: remove setFactory method with DynamicBinding + task.setFactory(new StormComponentFactory()); + task.init(); + + return (StormTopology)task.getTopology(); + } + + public static int numWorkers(List<String> tmpArgs){ + int position = tmpArgs.size() - 1; + int numWorkers; + + try { + numWorkers = Integer.parseInt(tmpArgs.get(position)); + tmpArgs.remove(position); + } catch (NumberFormatException e) { + numWorkers = 4; + } + + return numWorkers; + } + + public static Task getTask(String cliString) { + Task task = null; + try { + logger.debug("Providing task [{}]", cliString); + task = ClassOption.cliStringToObject(cliString, Task.class, null); + } catch (Exception e) { + logger.warn("Fail in initializing the task!"); + e.printStackTrace(); + } + return task; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java new file mode 100644 index 0000000..d066e42 --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java @@ -0,0 +1,65 @@ +//package com.yahoo.labs.samoa.topology.impl; +// +///* +// * #%L +// * SAMOA +// * %% +// * Copyright (C) 2013 Yahoo! Inc. +// * %% +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// * #L% +// */ +// +//import com.yahoo.labs.samoa.core.ContentEvent; +//import com.yahoo.labs.samoa.topology.impl.StormEntranceProcessingItem.StormEntranceSpout; +// +///** +// * Storm Stream that connects into Spout. It wraps the spout itself +// * @author Arinto Murdopo +// * +// */ +//final class StormSpoutStream extends StormStream{ +// +// /** +// * +// */ +// private static final long serialVersionUID = -7444653177614988650L; +// +// private StormEntranceSpout spout; +// +// StormSpoutStream(String stormComponentId) { +// super(stormComponentId); +// } +// +// @Override +// public void put(ContentEvent contentEvent) { +// spout.put(this, contentEvent); +// } +// +// void setSpout(StormEntranceSpout spout){ +// this.spout = spout; +// } +// +//// @Override +//// public void setStreamId(String stream) { +//// // TODO Auto-generated method stub +//// +//// } +// +// @Override +// public String getStreamId() { +// // TODO Auto-generated method stub +// return null; +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java new file mode 100644 index 0000000..f67ab19 --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java @@ -0,0 +1,85 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.UUID; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.topology.Stream; + +/** + * Abstract class to implement Storm Stream + * @author Arinto Murdopo + * + */ +abstract class StormStream implements Stream, java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = 281835563756514852L; + protected final String outputStreamId; + protected final InputStreamId inputStreamId; + + public StormStream(String stormComponentId){ + this.outputStreamId = UUID.randomUUID().toString(); + this.inputStreamId = new InputStreamId(stormComponentId, this.outputStreamId); + } + + @Override + public abstract void put(ContentEvent contentEvent); + + String getOutputId(){ + return this.outputStreamId; + } + + InputStreamId getInputId(){ + return this.inputStreamId; + } + + final static class InputStreamId implements java.io.Serializable{ + + /** + * + */ + private static final long serialVersionUID = -7457995634133691295L; + private final String componentId; + private final String streamId; + + InputStreamId(String componentId, String streamId){ + this.componentId = componentId; + this.streamId = streamId; + } + + String getComponentId(){ + return componentId; + } + + String getStreamId(){ + return streamId; + } + } + + @Override + public void setBatchSize(int batchSize) { + // Ignore batch size + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java new file mode 100644 index 0000000..7a49d8b --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java @@ -0,0 +1,52 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import backtype.storm.topology.TopologyBuilder; + +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.AbstractTopology; + +/** + * Adaptation of SAMOA topology in samoa-storm + * @author Arinto Murdopo + * + */ +public class StormTopology extends AbstractTopology { + + private TopologyBuilder builder; + + public StormTopology(String topologyName){ + super(topologyName); + this.builder = new TopologyBuilder(); + } + + @Override + public void addProcessingItem(IProcessingItem procItem, int parallelismHint){ + StormTopologyNode stormNode = (StormTopologyNode) procItem; + stormNode.addToTopology(this, parallelismHint); + super.addProcessingItem(procItem, parallelismHint); + } + + public TopologyBuilder getStormBuilder(){ + return builder; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java new file mode 100644 index 0000000..07fccbf --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java @@ -0,0 +1,34 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Interface to represent a node in samoa-storm topology. + * @author Arinto Murdopo + * + */ +interface StormTopologyNode { + + void addToTopology(StormTopology topology, int parallelismHint); + StormStream createStream(); + String getId(); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java new file mode 100644 index 0000000..1e1b048 --- /dev/null +++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java @@ -0,0 +1,133 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.thrift7.TException; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.generated.AlreadyAliveException; +import backtype.storm.generated.InvalidTopologyException; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +/** + * Helper class to submit SAMOA task into Storm without the need of submitting the jar file. + * The jar file must be submitted first using StormJarSubmitter class. + * @author Arinto Murdopo + * + */ +public class StormTopologySubmitter { + + public static String YJP_OPTIONS_KEY="YjpOptions"; + + private static Logger logger = LoggerFactory.getLogger(StormTopologySubmitter.class); + + public static void main(String[] args) throws IOException{ + Properties props = StormSamoaUtils.getProperties(); + + String uploadedJarLocation = props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + if(uploadedJarLocation == null){ + logger.error("Invalid properties file. It must have key {}", + StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + return; + } + + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + int numWorkers = StormSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + StormTopology stormTopo = StormSamoaUtils.argsToTopology(args); + + Config conf = new Config(); + conf.putAll(Utils.readStormConfig()); + conf.putAll(Utils.readCommandLineOpts()); + conf.setDebug(false); + conf.setNumWorkers(numWorkers); + + String profilerOption = + props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY); + if(profilerOption != null){ + String topoWorkerChildOpts = (String) conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); + StringBuilder optionBuilder = new StringBuilder(); + if(topoWorkerChildOpts != null){ + optionBuilder.append(topoWorkerChildOpts); + optionBuilder.append(' '); + } + optionBuilder.append(profilerOption); + conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString()); + } + + Map<String, Object> myConfigMap = new HashMap<String, Object>(conf); + StringWriter out = new StringWriter(); + + try { + JSONValue.writeJSONString(myConfigMap, out); + } catch (IOException e) { + System.out.println("Error in writing JSONString"); + e.printStackTrace(); + return; + } + + Config config = new Config(); + config.putAll(Utils.readStormConfig()); + + String nimbusHost = (String) config.get(Config.NIMBUS_HOST); + + NimbusClient nc = new NimbusClient(nimbusHost); + String topologyName = stormTopo.getTopologyName(); + try { + System.out.println("Submitting topology with name: " + + topologyName); + nc.getClient().submitTopology(topologyName, uploadedJarLocation, + out.toString(), stormTopo.getStormBuilder().createTopology()); + System.out.println(topologyName + " is successfully submitted"); + + } catch (AlreadyAliveException aae) { + System.out.println("Fail to submit " + topologyName + + "\nError message: " + aae.get_msg()); + } catch (InvalidTopologyException ite) { + System.out.println("Invalid topology for " + topologyName); + ite.printStackTrace(); + } catch (TException te) { + System.out.println("Texception for " + topologyName); + te.printStackTrace(); + } + } + + private static String uploadedJarLocation(List<String> tmpArgs){ + int position = tmpArgs.size() - 1; + String uploadedJarLocation = tmpArgs.get(position); + tmpArgs.remove(position); + return uploadedJarLocation; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java new file mode 100644 index 0000000..15b80b5 --- /dev/null +++ b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java @@ -0,0 +1,68 @@ +package com.yahoo.labs.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.junit.Test; + +public class AlgosTest { + + + @Test(timeout = 60000) + public void testVHTWithStorm() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(55f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) + .resultFilePollTimeout(30) + .prePollWait(15) + .taskClassName(LocalStormDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); + + } + + @Test(timeout = 120000) + public void testBaggingWithStorm() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(180_000) + .classifiedInstances(190_000) + .classificationsCorrect(60f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) + .resultFilePollTimeout(40) + .prePollWait(20) + .taskClassName(LocalStormDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java new file mode 100644 index 0000000..ec8929a --- /dev/null +++ b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java @@ -0,0 +1,78 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.assertEquals; +import mockit.Expectations; +import mockit.MockUp; +import mockit.Mocked; +import mockit.Tested; +import mockit.Verifications; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.TopologyBuilder; + +import com.yahoo.labs.samoa.core.Processor; + +public class StormProcessingItemTest { + private static final int PARRALLELISM_HINT_2 = 2; + private static final int PARRALLELISM_HINT_4 = 4; + private static final String ID = "id"; + @Tested private StormProcessingItem pi; + @Mocked private Processor processor; + @Mocked private StormTopology topology; + @Mocked private TopologyBuilder stormBuilder = new TopologyBuilder(); + + @Before + public void setUp() { + pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2); + } + + @Test + public void testAddToTopology() { + new Expectations() { + { + topology.getStormBuilder(); + result = stormBuilder; + + stormBuilder.setBolt(ID, (IRichBolt) any, anyInt); + result = new MockUp<BoltDeclarer>() { + }.getMockInstance(); + } + }; + + pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint is ignored + + new Verifications() { + { + assertEquals(pi.getProcessor(), processor); + // TODO add methods to explore a topology and verify them + assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2); + assertEquals(pi.getId(), ID); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/README.md ---------------------------------------------------------------------- diff --git a/samoa-test/README.md b/samoa-test/README.md new file mode 100644 index 0000000..63719ef --- /dev/null +++ b/samoa-test/README.md @@ -0,0 +1,14 @@ +This module contains a test framework for simplifying regression testing of Samoa algorithms on various platforms. + +The test framework is generic and reusable for multiple platforms. The platform modules that make use of the test framework add a maven dependency to a test-jar artifact of the samoa-test module. This test-jar artifact includes the test framework classes and its dependencies. + +For defining tests, we reuse the code from the test framework but customize tests according to the platform capabilities. + +For each algorithm to test, we must provide : + +* the task class for the platform +* the algorithm (referring to the provided string templates in this module) +* the input parameters +* the expectations (thresholds or values) + +See existing code in samo-local, samoa-threads and samoa-storm for some examples. http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-test/pom.xml b/samoa-test/pom.xml new file mode 100644 index 0000000..2ee103b --- /dev/null +++ b/samoa-test/pom.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>samoa</artifactId> + <groupId>com.yahoo.labs.samoa</groupId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>samoa-test</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>1.0</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> + <dependency> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa-api</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>src/main/assembly/test-jar-with-dependencies.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/src/main/assembly/test-jar-with-dependencies.xml ---------------------------------------------------------------------- diff --git a/samoa-test/src/main/assembly/test-jar-with-dependencies.xml b/samoa-test/src/main/assembly/test-jar-with-dependencies.xml new file mode 100644 index 0000000..51465cc --- /dev/null +++ b/samoa-test/src/main/assembly/test-jar-with-dependencies.xml @@ -0,0 +1,19 @@ +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>test-jar-with-dependencies</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <!-- we're creating the test-jar as an attachement --> + <useProjectAttachments>true</useProjectAttachments> + <useTransitiveDependencies>false</useTransitiveDependencies> + <unpack>true</unpack> + </dependencySet> + </dependencySets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java new file mode 100644 index 0000000..08ad94f --- /dev/null +++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java @@ -0,0 +1,235 @@ +package com.yahoo.labs.samoa; + +public class TestParams { + + /** + * templates that take the following parameters: + * <ul> + * <li>the output file location as an argument (-d), + * <li>the maximum number of instances for testing/training (-i) + * <li>the sampling size (-f) + * <li>the delay in ms between input instances (-w) , default is zero + * </ul> + * as well as the maximum number of instances for testing/training (-i) and the sampling size (-f) + */ + public static class Templates { + + public final static String PREQEVAL_VHT_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " + + "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 10 -u 10)"; + + public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (classifiers.SingleClassifier -l com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " + + "-s (com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator -c 2)"; + + // setting the number of nominal attributes to zero significantly reduces the processing time, + // so that it's acceptable in a test case + public final static String PREQEVAL_BAGGING_RANDOMTREE = "PrequentialEvaluation -d %s -i %d -f %d -w %d " + + "-l (com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " + + "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 0 -u 10)"; + + } + + + public static final String EVALUATION_INSTANCES = "evaluation instances"; + public static final String CLASSIFIED_INSTANCES = "classified instances"; + public static final String CLASSIFICATIONS_CORRECT = "classifications correct (percent)"; + public static final String KAPPA_STAT = "Kappa Statistic (percent)"; + public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic (percent)"; + + + private long inputInstances; + private long samplingSize; + private long evaluationInstances; + private long classifiedInstances; + private float classificationsCorrect; + private float kappaStat; + private float kappaTempStat; + private String cliStringTemplate; + private int pollTimeoutSeconds; + private final int prePollWait; + private int inputDelayMicroSec; + private String taskClassName; + + private TestParams(String taskClassName, + long inputInstances, + long samplingSize, + long evaluationInstances, + long classifiedInstances, + float classificationsCorrect, + float kappaStat, + float kappaTempStat, + String cliStringTemplate, + int pollTimeoutSeconds, + int prePollWait, + int inputDelayMicroSec) { + this.taskClassName = taskClassName; + this.inputInstances = inputInstances; + this.samplingSize = samplingSize; + this.evaluationInstances = evaluationInstances; + this.classifiedInstances = classifiedInstances; + this.classificationsCorrect = classificationsCorrect; + this.kappaStat = kappaStat; + this.kappaTempStat = kappaTempStat; + this.cliStringTemplate = cliStringTemplate; + this.pollTimeoutSeconds = pollTimeoutSeconds; + this.prePollWait = prePollWait; + this.inputDelayMicroSec = inputDelayMicroSec; + } + + public String getTaskClassName() { + return taskClassName; + } + + public long getInputInstances() { + return inputInstances; + } + + public long getSamplingSize() { + return samplingSize; + } + + public int getPollTimeoutSeconds() { + return pollTimeoutSeconds; + } + + public int getPrePollWaitSeconds() { + return prePollWait; + } + + public String getCliStringTemplate() { + return cliStringTemplate; + } + + public long getEvaluationInstances() { + return evaluationInstances; + } + + public long getClassifiedInstances() { + return classifiedInstances; + } + + public float getClassificationsCorrect() { + return classificationsCorrect; + } + + public float getKappaStat() { + return kappaStat; + } + + public float getKappaTempStat() { + return kappaTempStat; + } + + public int getInputDelayMicroSec() { + return inputDelayMicroSec; + } + + @Override + public String toString() { + return "TestParams{\n" + + "inputInstances=" + inputInstances + "\n" + + "samplingSize=" + samplingSize + "\n" + + "evaluationInstances=" + evaluationInstances + "\n" + + "classifiedInstances=" + classifiedInstances + "\n" + + "classificationsCorrect=" + classificationsCorrect + "\n" + + "kappaStat=" + kappaStat + "\n" + + "kappaTempStat=" + kappaTempStat + "\n" + + "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" + + "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" + + "prePollWait=" + prePollWait + "\n" + + "taskClassName='" + taskClassName + '\'' + "\n" + + "inputDelayMicroSec=" + inputDelayMicroSec + "\n" + + '}'; + } + + public static class Builder { + private long inputInstances; + private long samplingSize; + private long evaluationInstances; + private long classifiedInstances; + private float classificationsCorrect; + private float kappaStat =0f; + private float kappaTempStat =0f; + private String cliStringTemplate; + private int pollTimeoutSeconds = 10; + private int prePollWaitSeconds = 10; + private String taskClassName; + private int inputDelayMicroSec = 0; + + public Builder taskClassName(String taskClassName) { + this.taskClassName = taskClassName; + return this; + } + + public Builder inputInstances(long inputInstances) { + this.inputInstances = inputInstances; + return this; + } + + public Builder samplingSize(long samplingSize) { + this.samplingSize = samplingSize; + return this; + } + + public Builder evaluationInstances(long evaluationInstances) { + this.evaluationInstances = evaluationInstances; + return this; + } + + public Builder classifiedInstances(long classifiedInstances) { + this.classifiedInstances = classifiedInstances; + return this; + } + + public Builder classificationsCorrect(float classificationsCorrect) { + this.classificationsCorrect = classificationsCorrect; + return this; + } + + public Builder kappaStat(float kappaStat) { + this.kappaStat = kappaStat; + return this; + } + + public Builder kappaTempStat(float kappaTempStat) { + this.kappaTempStat = kappaTempStat; + return this; + } + + public Builder cliStringTemplate(String cliStringTemplate) { + this.cliStringTemplate = cliStringTemplate; + return this; + } + + public Builder resultFilePollTimeout(int pollTimeoutSeconds) { + this.pollTimeoutSeconds = pollTimeoutSeconds; + return this; + } + + public Builder inputDelayMicroSec(int inputDelayMicroSec) { + this.inputDelayMicroSec = inputDelayMicroSec; + return this; + } + + public Builder prePollWait(int prePollWaitSeconds) { + this.prePollWaitSeconds = prePollWaitSeconds; + return this; + } + + public TestParams build() { + return new TestParams(taskClassName, + inputInstances, + samplingSize, + evaluationInstances, + classifiedInstances, + classificationsCorrect, + kappaStat, + kappaTempStat, + cliStringTemplate, + pollTimeoutSeconds, + prePollWaitSeconds, + inputDelayMicroSec); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java ---------------------------------------------------------------------- diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java new file mode 100644 index 0000000..d66f5df --- /dev/null +++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java @@ -0,0 +1,153 @@ +package com.yahoo.labs.samoa;/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.input.Tailer; +import org.apache.commons.io.input.TailerListenerAdapter; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class TestUtils { + + private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class.getName()); + + + public static void test(final TestParams testParams) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InterruptedException { + + final File tempFile = File.createTempFile("test", "test"); + + LOG.info("Starting test, output file is {}, test config is \n{}", tempFile.getAbsolutePath(), testParams.toString()); + + Executors.newSingleThreadExecutor().submit(new Callable<Void>() { + + @Override + public Void call() throws Exception { + try { + Class.forName(testParams.getTaskClassName()) + .getMethod("main", String[].class) + .invoke(null, (Object) String.format( + testParams.getCliStringTemplate(), + tempFile.getAbsolutePath(), + testParams.getInputInstances(), + testParams.getSamplingSize(), + testParams.getInputDelayMicroSec() + ).split("[ ]")); + } catch (Exception e) { + LOG.error("Cannot execute test {} {}", e.getMessage(), e.getCause().getMessage()); + } + return null; + } + }); + + Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds())); + + CountDownLatch signalComplete = new CountDownLatch(1); + + final Tailer tailer = Tailer.create(tempFile, new TestResultsTailerAdapter(signalComplete), 1000); + new Thread(new Runnable() { + @Override + public void run() { + tailer.run(); + } + }).start(); + + signalComplete.await(); + tailer.stop(); + + assertResults(tempFile, testParams); + } + + public static void assertResults(File outputFile, com.yahoo.labs.samoa.TestParams testParams) throws IOException { + + LOG.info("Checking results file " + outputFile.getAbsolutePath()); + // 1. parse result file with csv parser + Reader in = new FileReader(outputFile); + Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false) + .withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in); + CSVRecord last = null; + Iterator<CSVRecord> iterator = records.iterator(); + CSVRecord header = iterator.next(); + Assert.assertEquals("Invalid number of columns", 5, header.size()); + + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim()); + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim()); + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2).trim()); + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim()); + Assert.assertEquals("Unexpected column", com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim()); + + // 2. check last line result + while (iterator.hasNext()) { + last = iterator.next(); + } + + assertTrue(String.format("Unmet threshold expected %d got %f", + testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))), + testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0))); + assertTrue(String.format("Unmet threshold expected %d got %f", testParams.getClassifiedInstances(), + Float.parseFloat(last.get(1))), + testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))), + testParams.getClassificationsCorrect() <= Float.parseFloat(last.get(2))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getKappaStat(), Float.parseFloat(last.get(3))), + testParams.getKappaStat() <= Float.parseFloat(last.get(3))); + assertTrue(String.format("Unmet threshold expected %f got %f", + testParams.getKappaTempStat(), Float.parseFloat(last.get(4))), + testParams.getKappaTempStat() <= Float.parseFloat(last.get(4))); + + } + + + private static class TestResultsTailerAdapter extends TailerListenerAdapter { + + private final CountDownLatch signalComplete; + + public TestResultsTailerAdapter(CountDownLatch signalComplete) { + this.signalComplete = signalComplete; + } + + @Override + public void handle(String line) { + if ("# COMPLETED".equals(line.trim())) { + signalComplete.countDown(); + } + } + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-threads/pom.xml b/samoa-threads/pom.xml new file mode 100644 index 0000000..c4a6fb4 --- /dev/null +++ b/samoa-threads/pom.xml @@ -0,0 +1,112 @@ +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <name>samoa-threads</name> + <description>Multithreading local engine for SAMOA</description> + + <artifactId>samoa-threads</artifactId> + <parent> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa-api</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa-test</artifactId> + <type>test-jar</type> + <classifier>test-jar-with-dependencies</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>${slf4j-simple.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- SAMOA assembly --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven-assembly-plugin.version}</version> + <configuration> + <finalName>SAMOA-Threads-${project.version}</finalName> + <appendAssemblyId>false</appendAssemblyId> + <attach>false</attach> + <outputDirectory>../target</outputDirectory> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifestEntries> + <Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version> + <Bundle-Description>${project.description}</Bundle-Description> + <Implementation-Version>${project.version}</Implementation-Version> + <Implementation-Vendor>Yahoo Labs</Implementation-Vendor> + <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id> + </manifestEntries> + </archive> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <argLine>-Xmx1G</argLine> + <redirectTestOutputToFile>false</redirectTestOutputToFile> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java new file mode 100644 index 0000000..21ccf9e --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java @@ -0,0 +1,70 @@ +package com.yahoo.labs.samoa; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.yahoo.labs.samoa.tasks.Task; +import com.yahoo.labs.samoa.topology.impl.ThreadsComponentFactory; +import com.yahoo.labs.samoa.topology.impl.ThreadsEngine; + +/** + * @author Anh Thu Vu + * + */ +public class LocalThreadsDoTask { + private static final Logger logger = LoggerFactory.getLogger(LocalThreadsDoTask.class); + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + // Get number of threads for multithreading mode + int numThreads = 1; + for (int i=0; i<tmpArgs.size()-1; i++) { + if (tmpArgs.get(i).equals("-t")) { + try { + numThreads = Integer.parseInt(tmpArgs.get(i+1)); + tmpArgs.remove(i+1); + tmpArgs.remove(i); + } catch (NumberFormatException e) { + System.err.println("Invalid number of threads."); + System.err.println(e.getStackTrace()); + } + } + } + logger.info("Number of threads:{}", numThreads); + + args = tmpArgs.toArray(new String[0]); + + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task = null; + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.info("Sucessfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new ThreadsComponentFactory()); + task.init(); + + ThreadsEngine.submitTopology(task.getTopology(), numThreads); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java new file mode 100644 index 0000000..ac68da2 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java @@ -0,0 +1,64 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.ComponentFactory; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; + +/** + * ComponentFactory for multithreaded engine + * @author Anh Thu Vu + * + */ +public class ThreadsComponentFactory implements ComponentFactory { + + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + @Override + public ProcessingItem createPi(Processor processor, int paralellism) { + return new ThreadsProcessingItem(processor, paralellism); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new ThreadsEntranceProcessingItem(entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + return new ThreadsStream(sourcePi); + } + + @Override + public Topology createTopology(String topoName) { + return new ThreadsTopology(topoName); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java new file mode 100644 index 0000000..d442572 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java @@ -0,0 +1,100 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.topology.Topology; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Multithreaded engine. + * @author Anh Thu Vu + * + */ +public class ThreadsEngine { + + private static final List<ExecutorService> threadPool = new ArrayList<ExecutorService>(); + + /* + * Create and manage threads + */ + public static void setNumberOfThreads(int numThreads) { + if (numThreads < 1) + throw new IllegalStateException("Number of threads must be a positive integer."); + + if (threadPool.size() > numThreads) + throw new IllegalStateException("You cannot set a numThreads smaller than the current size of the threads pool."); + + if (threadPool.size() < numThreads) { + for (int i=threadPool.size(); i<numThreads; i++) { + threadPool.add(Executors.newSingleThreadExecutor()); + } + } + } + + public static int getNumberOfThreads() { + return threadPool.size(); + } + + public static ExecutorService getThreadWithIndex(int index) { + if (threadPool.size() <= 0 ) + throw new IllegalStateException("Try to get ExecutorService from an empty pool."); + index %= threadPool.size(); + return threadPool.get(index); + } + + /* + * Submit topology and start + */ + private static void submitTopology(Topology topology) { + ThreadsTopology tTopology = (ThreadsTopology) topology; + tTopology.run(); + } + + public static void submitTopology(Topology topology, int numThreads) { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.submitTopology(topology); + } + + /* + * Stop + */ + public static void clearThreadPool() { + for (ExecutorService pool:threadPool) { + pool.shutdown(); + } + + for (ExecutorService pool:threadPool) { + try { + pool.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + threadPool.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java new file mode 100644 index 0000000..008efb6 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java @@ -0,0 +1,40 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem; + +/** + * EntranceProcessingItem for multithreaded engine. + * @author Anh Thu Vu + * + */ +public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem { + + public ThreadsEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java new file mode 100644 index 0000000..7cb8c18 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java @@ -0,0 +1,61 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; + +/** + * Runnable class where each object corresponds to a ContentEvent and an assigned PI. + * When a PI receives a ContentEvent, it will create a ThreadsEventRunnable with the received ContentEvent + * and an assigned workerPI. This runnable is then submitted to a thread queue waiting to be executed. + * The worker PI will process the received event when the runnable object is executed/run. + * @author Anh Thu Vu + * + */ +public class ThreadsEventRunnable implements Runnable { + + private ThreadsProcessingItemInstance workerPi; + private ContentEvent event; + + public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, ContentEvent event) { + this.workerPi = workerPi; + this.event = event; + } + + public ThreadsProcessingItemInstance getWorkerProcessingItem() { + return this.workerPi; + } + + public ContentEvent getContentEvent() { + return this.event; + } + + @Override + public void run() { + try { + workerPi.processEvent(event); + } + catch (Exception e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java new file mode 100644 index 0000000..5eb6174 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java @@ -0,0 +1,101 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.AbstractProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.utils.PartitioningScheme; +import com.yahoo.labs.samoa.utils.StreamDestination; + +/** + * ProcessingItem for multithreaded engine. + * @author Anh Thu Vu + * + */ +public class ThreadsProcessingItem extends AbstractProcessingItem { + // Replicas of the ProcessingItem. + // When ProcessingItem receives an event, it assigns one + // of these replicas to process the event. + private List<ThreadsProcessingItemInstance> piInstances; + + // Each replica of ProcessingItem is assigned to one of the + // available threads in a round-robin fashion, i.e.: each + // replica is associated with the index of a thread. + // Each ProcessingItem has a random offset variable so that + // the allocation of PI replicas to threads are spread evenly + // among all threads. + private int offset; + + /* + * Constructor + */ + public ThreadsProcessingItem(Processor processor, int parallelismHint) { + super(processor, parallelismHint); + this.offset = (int) (Math.random()*ThreadsEngine.getNumberOfThreads()); + } + + public List<ThreadsProcessingItemInstance> getProcessingItemInstances() { + return this.piInstances; + } + + /* + * Connects to streams + */ + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((ThreadsStream) inputStream).addDestination(destination); + return this; + } + + /* + * Process the received event. + */ + public void processEvent(ContentEvent event, int counter) { + if (this.piInstances == null || this.piInstances.size() < this.getParallelism()) + throw new IllegalStateException("ThreadsWorkerProcessingItem(s) need to be setup before process any event (i.e. in ThreadsTopology.start())."); + + ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter); + ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, event); + ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable); + } + + /* + * Setup the replicas of this PI. + * This should be called after the topology is set up (all Processors and PIs are + * setup and connected to the respective streams) and before events are sent. + */ + public void setupInstances() { + this.piInstances = new ArrayList<ThreadsProcessingItemInstance>(this.getParallelism()); + for (int i=0; i<this.getParallelism(); i++) { + Processor newProcessor = this.getProcessor().newProcessor(this.getProcessor()); + newProcessor.onCreate(i + 1); + this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, this.offset + i)); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java new file mode 100644 index 0000000..9a400d1 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java @@ -0,0 +1,54 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; + +/** + * Lightweight replicas of ThreadProcessingItem. + * ThreadsProcessingItem manages a list of these objects and + * assigns each incoming message to be processed by one of them. + * @author Anh Thu Vu + * + */ +public class ThreadsProcessingItemInstance { + + private Processor processor; + private int threadIndex; + + public ThreadsProcessingItemInstance(Processor processor, int threadIndex) { + this.processor = processor; + this.threadIndex = threadIndex; + } + + public int getThreadIndex() { + return this.threadIndex; + } + + public Processor getProcessor() { + return this.processor; + } + + public void processEvent(ContentEvent event) { + this.processor.process(event); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java new file mode 100644 index 0000000..5aa86f7 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java @@ -0,0 +1,106 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.topology.IProcessingItem; +import com.yahoo.labs.samoa.topology.AbstractStream; +import com.yahoo.labs.samoa.utils.StreamDestination; + +/** + * Stream for multithreaded engine. + * @author Anh Thu Vu + * + */ +public class ThreadsStream extends AbstractStream { + + private List<StreamDestination> destinations; + private int counter = 0; + private int maxCounter = 1; + + public ThreadsStream(IProcessingItem sourcePi) { + destinations = new LinkedList<StreamDestination>(); + } + + public void addDestination(StreamDestination destination) { + destinations.add(destination); + maxCounter *= destination.getParallelism(); + } + + public List<StreamDestination> getDestinations() { + return this.destinations; + } + + private int getNextCounter() { + if (maxCounter > 0 && counter >= maxCounter) counter = 0; + this.counter++; + return this.counter; + } + + @Override + public synchronized void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + ThreadsProcessingItem pi; + int parallelism; + for (StreamDestination destination:destinations) { + pi = (ThreadsProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter%parallelism); + break; + case GROUP_BY_KEY: + pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism)); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); + } + break; + } + } + } + + private static int getPIIndexForKey(String key, int parallelism) { + // If key is null, return a default index: 0 + if (key == null) return 0; + + // HashCodeBuilder object does not have reset() method + // So all objects that get appended will be included in the + // computation of the hashcode. + // To avoid initialize a HashCodeBuilder for each event, + // here I use the static method with reflection on the event's key + int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism; + if (index < 0) { + index += parallelism; + } + return index; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java new file mode 100644 index 0000000..fc9f885 --- /dev/null +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java @@ -0,0 +1,62 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.topology.AbstractTopology; +import com.yahoo.labs.samoa.topology.IProcessingItem; + +/** + * Topology for multithreaded engine. + * @author Anh Thu Vu + * + */ +public class ThreadsTopology extends AbstractTopology { + ThreadsTopology(String name) { + super(name); + } + + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("ThreadsTopology supports 1 entrance PI only. Number of entrance PIs is "+this.getEntranceProcessingItems().size()); + + this.setupProcessingItemInstances(); + ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0]; + if (entrancePi == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode + entrancePi.startSendingEvents(); + } + + /* + * Tell all the ThreadsProcessingItems to create & init their + * replicas (ThreadsProcessingItemInstance) + */ + private void setupProcessingItemInstances() { + for (IProcessingItem pi:this.getProcessingItems()) { + if (pi instanceof ThreadsProcessingItem) { + ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi; + tpi.setupInstances(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java new file mode 100644 index 0000000..5979d46 --- /dev/null +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java @@ -0,0 +1,68 @@ +package com.yahoo.labs.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.junit.Test; + +public class AlgosTest { + + @Test(timeout = 60000) + public void testVHTWithThreads() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(55f) + .kappaStat(-0.1f) + .kappaTempStat(-0.1f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 2") + .resultFilePollTimeout(10) + .prePollWait(10) + .taskClassName(LocalThreadsDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); + + } + + @Test(timeout = 180000) + public void testBaggingWithThreads() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(100_000) + .samplingSize(10_000) + .inputDelayMicroSec(100) // prevents saturating the system due to unbounded queues + .evaluationInstances(90_000) + .classifiedInstances(105_000) + .classificationsCorrect(55f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + " -t 2") + .prePollWait(10) + .resultFilePollTimeout(30) + .taskClassName(LocalThreadsDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java new file mode 100644 index 0000000..eee8639 --- /dev/null +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java @@ -0,0 +1,114 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import mockit.Tested; + +import org.junit.Before; +import org.junit.Test; + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.topology.Topology; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsComponentFactoryTest { + @Tested private ThreadsComponentFactory factory; + @Mocked private Processor processor, processorReplica; + @Mocked private EntranceProcessor entranceProcessor; + + private final int parallelism = 3; + private final String topoName = "TestTopology"; + + + @Before + public void setUp() throws Exception { + factory = new ThreadsComponentFactory(); + } + + @Test + public void testCreatePiNoParallelism() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result=processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor); + assertNotNull("ProcessingItem created is null.",pi); + assertEquals("ProcessingItem created is not a ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass()); + assertEquals("Parallelism of PI is not 1",1,pi.getParallelism(),0); + } + + @Test + public void testCreatePiWithParallelism() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result=processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor,parallelism); + assertNotNull("ProcessingItem created is null.",pi); + assertEquals("ProcessingItem created is not a ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass()); + assertEquals("Parallelism of PI is not ",parallelism,pi.getParallelism(),0); + } + + @Test + public void testCreateStream() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result=processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor); + + Stream stream = factory.createStream(pi); + assertNotNull("Stream created is null",stream); + assertEquals("Stream created is not a ThreadsStream.",ThreadsStream.class,stream.getClass()); + } + + @Test + public void testCreateTopology() { + Topology topology = factory.createTopology(topoName); + assertNotNull("Topology created is null.",topology); + assertEquals("Topology created is not a ThreadsTopology.",ThreadsTopology.class,topology.getClass()); + } + + @Test + public void testCreateEntrancePi() { + EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); + assertNotNull("EntranceProcessingItem created is null.",entrancePi); + assertEquals("EntranceProcessingItem created is not a ThreadsEntranceProcessingItem.",ThreadsEntranceProcessingItem.class,entrancePi.getClass()); + assertSame("EntranceProcessor is not set correctly.",entranceProcessor, entrancePi.getProcessor()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java new file mode 100644 index 0000000..cdb8949 --- /dev/null +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java @@ -0,0 +1,129 @@ +package com.yahoo.labs.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.Verifications; + +import org.junit.After; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsEngineTest { + + @Mocked ThreadsTopology topology; + + private final int numThreads = 4; + private final int numThreadsSmaller = 3; + private final int numThreadsLarger = 5; + + @After + public void cleanup() { + ThreadsEngine.clearThreadPool(); + } + + @Test + public void testSetNumberOfThreadsSimple() { + ThreadsEngine.setNumberOfThreads(numThreads); + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(),0); + } + + @Test + public void testSetNumberOfThreadsRepeat() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreads); + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(),0); + } + + @Test + public void testSetNumberOfThreadsIncrease() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreadsLarger); + assertEquals("Number of threads is not set correctly.", numThreadsLarger, + ThreadsEngine.getNumberOfThreads(),0); + } + + @Test(expected=IllegalStateException.class) + public void testSetNumberOfThreadsDecrease() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreadsSmaller); + // Exception expected + } + + @Test(expected=IllegalStateException.class) + public void testSetNumberOfThreadsNegative() { + ThreadsEngine.setNumberOfThreads(-1); + // Exception expected + } + + @Test(expected=IllegalStateException.class) + public void testSetNumberOfThreadsZero() { + ThreadsEngine.setNumberOfThreads(0); + // Exception expected + } + + @Test + public void testClearThreadPool() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.clearThreadPool(); + assertEquals("ThreadsEngine was not shutdown properly.", 0, ThreadsEngine.getNumberOfThreads()); + } + + @Test + public void testGetThreadWithIndexWithinPoolSize() { + ThreadsEngine.setNumberOfThreads(numThreads); + for (int i=0; i<numThreads; i++) { + assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); + } + } + + @Test + public void testGetThreadWithIndexOutOfPoolSize() { + ThreadsEngine.setNumberOfThreads(numThreads); + for (int i=0; i<numThreads+3; i++) { + assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); + } + } + + @Test(expected=IllegalStateException.class) + public void testGetThreadWithIndexFromEmptyPool() { + for (int i=0; i<numThreads; i++) { + ThreadsEngine.getThreadWithIndex(i); + } + } + + @Test + public void testSubmitTopology() { + ThreadsEngine.submitTopology(topology, numThreads); + new Verifications() {{ + topology.run(); times=1; + }}; + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(),0); + } + +}
