http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java deleted file mode 100644 index 109e927..0000000 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.topology.AbstractStream; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * - * @author abifet - */ -class SimpleStream extends AbstractStream { - private List<StreamDestination> destinations; - private int maxCounter; - private int eventCounter; - - SimpleStream(IProcessingItem sourcePi) { - super(sourcePi); - this.destinations = new LinkedList<>(); - this.eventCounter = 0; - this.maxCounter = 1; - } - - private int getNextCounter() { - if (maxCounter > 0 && eventCounter >= maxCounter) - eventCounter = 0; - this.eventCounter++; - return this.eventCounter; - } - - @Override - public void put(ContentEvent event) { - this.put(event, this.getNextCounter()); - } - - private void put(ContentEvent event, int counter) { - SimpleProcessingItem pi; - int parallelism; - for (StreamDestination destination : destinations) { - pi = (SimpleProcessingItem) destination.getProcessingItem(); - parallelism = destination.getParallelism(); - switch (destination.getPartitioningScheme()) { - case SHUFFLE: - pi.processEvent(event, counter % parallelism); - break; - case GROUP_BY_KEY: - HashCodeBuilder hb = new HashCodeBuilder(); - hb.append(event.getKey()); - int key = hb.build() % parallelism; - pi.processEvent(event, key); - break; - case BROADCAST: - for (int p = 0; p < parallelism; p++) { - pi.processEvent(event, p); - } - break; - } - } - } - - public void addDestination(StreamDestination destination) { - this.destinations.add(destination); - if (maxCounter <= 0) - maxCounter = 1; - maxCounter *= destination.getParallelism(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java deleted file mode 100644 index 5ffa09e..0000000 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.topology.AbstractTopology; - -public class SimpleTopology extends AbstractTopology { - SimpleTopology(String name) { - super(name); - } - - public void run() { - if (this.getEntranceProcessingItems() == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - if (this.getEntranceProcessingItems().size() != 1) - throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " - + this.getEntranceProcessingItems().size()); - - SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() - .toArray()[0]; - entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode - entrancePi.startSendingEvents(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java b/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java new file mode 100644 index 0000000..0a8c3d0 --- /dev/null +++ b/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java @@ -0,0 +1,90 @@ +package org.apache.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.impl.SimpleComponentFactory; +import org.apache.samoa.topology.impl.SimpleEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.FlagOption; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.Option; + +/** + * The Class DoTask. + */ +public class LocalDoTask { + + // TODO: clean up this class for helping ML Developer in SAMOA + // TODO: clean up code from storm-impl + + // It seems that the 3 extra options are not used. + // Probably should remove them + private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task status output. Normally it is sent to stderr."; + private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task result output. Normally it is sent to stdout."; + private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in milliseconds between status updates."; + private static final Logger logger = LoggerFactory.getLogger(LocalDoTask.class); + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + // args = tmpArgs.toArray(new String[0]); + + FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', SUPPRESS_STATUS_OUT_MSG); + + FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', SUPPRESS_RESULT_OUT_MSG); + + IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 'F', STATUS_UPDATE_FREQ_MSG, 1000, 0, + Integer.MAX_VALUE); + + Option[] extraOptions = new Option[] { suppressStatusOutOpt, suppressResultOutOpt, statusUpdateFreqOpt }; + + StringBuilder cliString = new StringBuilder(); + for (String arg : args) { + cliString.append(" ").append(arg); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task; + try { + task = ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); + logger.info("Successfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new SimpleComponentFactory()); + task.init(); + SimpleEngine.submitTopology(task.getTopology()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java new file mode 100644 index 0000000..c18b72d --- /dev/null +++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java @@ -0,0 +1,53 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +public class SimpleComponentFactory implements ComponentFactory { + + public ProcessingItem createPi(Processor processor, int paralellism) { + return new SimpleProcessingItem(processor, paralellism); + } + + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new SimpleEntranceProcessingItem(processor); + } + + public Stream createStream(IProcessingItem sourcePi) { + return new SimpleStream(sourcePi); + } + + public Topology createTopology(String topoName) { + return new SimpleTopology(topoName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java new file mode 100644 index 0000000..06d03d3 --- /dev/null +++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java @@ -0,0 +1,37 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.Topology; + +public class SimpleEngine { + + public static void submitTopology(Topology topology) { + SimpleTopology simpleTopology = (SimpleTopology) topology; + simpleTopology.run(); + // runs until completion + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java new file mode 100644 index 0000000..729ad31 --- /dev/null +++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java @@ -0,0 +1,33 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.LocalEntranceProcessingItem; + +class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem { + public SimpleEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java new file mode 100644 index 0000000..1dd1562 --- /dev/null +++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java @@ -0,0 +1,87 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleProcessingItem extends AbstractProcessingItem { + private IProcessingItem[] arrayProcessingItem; + + SimpleProcessingItem(Processor processor) { + super(processor); + } + + SimpleProcessingItem(Processor processor, int parallelism) { + super(processor); + this.setParallelism(parallelism); + } + + public IProcessingItem getProcessingItem(int i) { + return arrayProcessingItem[i]; + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((SimpleStream) inputStream).addDestination(destination); + return this; + } + + public SimpleProcessingItem copy() { + Processor processor = this.getProcessor(); + return new SimpleProcessingItem(processor.newProcessor(processor)); + } + + public void processEvent(ContentEvent event, int counter) { + + int parallelism = this.getParallelism(); + // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); + if (this.arrayProcessingItem == null && parallelism > 0) { + // Init processing elements, the first time they are needed + this.arrayProcessingItem = new IProcessingItem[parallelism]; + for (int j = 0; j < parallelism; j++) { + arrayProcessingItem[j] = this.copy(); + arrayProcessingItem[j].getProcessor().onCreate(j); + } + } + if (this.arrayProcessingItem != null) { + IProcessingItem pi = this.getProcessingItem(counter); + Processor p = pi.getProcessor(); + // System.out.println("PI="+pi+", p="+p); + this.getProcessingItem(counter).getProcessor().process(event); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java new file mode 100644 index 0000000..3137c60 --- /dev/null +++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java @@ -0,0 +1,95 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleStream extends AbstractStream { + private List<StreamDestination> destinations; + private int maxCounter; + private int eventCounter; + + SimpleStream(IProcessingItem sourcePi) { + super(sourcePi); + this.destinations = new LinkedList<>(); + this.eventCounter = 0; + this.maxCounter = 1; + } + + private int getNextCounter() { + if (maxCounter > 0 && eventCounter >= maxCounter) + eventCounter = 0; + this.eventCounter++; + return this.eventCounter; + } + + @Override + public void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + SimpleProcessingItem pi; + int parallelism; + for (StreamDestination destination : destinations) { + pi = (SimpleProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter % parallelism); + break; + case GROUP_BY_KEY: + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + int key = hb.build() % parallelism; + pi.processEvent(event, key); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); + } + break; + } + } + } + + public void addDestination(StreamDestination destination) { + this.destinations.add(destination); + if (maxCounter <= 0) + maxCounter = 1; + maxCounter *= destination.getParallelism(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java new file mode 100644 index 0000000..660ebc0 --- /dev/null +++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java @@ -0,0 +1,46 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; + +public class SimpleTopology extends AbstractTopology { + SimpleTopology(String name) { + super(name); + } + + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " + + this.getEntranceProcessingItems().size()); + + SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() + .toArray()[0]; + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode + entrancePi.startSendingEvents(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java deleted file mode 100644 index 2f7c7e1..0000000 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.yahoo.labs.samoa; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.junit.Test; - -public class AlgosTest { - - @Test - public void testVHTLocal() throws Exception { - - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(75f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) - .resultFilePollTimeout(10) - .prePollWait(10) - .taskClassName(LocalDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); - - } - - @Test - public void testBaggingLocal() throws Exception { - TestParams baggingConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(180_000) - .classifiedInstances(210_000) - .classificationsCorrect(60f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) - .prePollWait(10) - .resultFilePollTimeout(10) - .taskClassName(LocalDoTask.class.getName()) - .build(); - TestUtils.test(baggingConfig); - - } - - @Test - public void testNaiveBayesLocal() throws Exception { - - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(65f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE) - .resultFilePollTimeout(10) - .prePollWait(10) - .taskClassName(LocalDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java deleted file mode 100644 index c43ef72..0000000 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.junit.Assert.*; -import mockit.Mocked; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; - -/** - * @author Anh Thu Vu - * - */ -public class SimpleComponentFactoryTest { - - @Tested - private SimpleComponentFactory factory; - @Mocked - private Processor processor, processorReplica; - @Mocked - private EntranceProcessor entranceProcessor; - - private final int parallelism = 3; - private final String topoName = "TestTopology"; - - @Before - public void setUp() throws Exception { - factory = new SimpleComponentFactory(); - } - - @Test - public void testCreatePiNoParallelism() { - ProcessingItem pi = factory.createPi(processor); - assertNotNull("ProcessingItem created is null.", pi); - assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass()); - assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0); - } - - @Test - public void testCreatePiWithParallelism() { - ProcessingItem pi = factory.createPi(processor, parallelism); - assertNotNull("ProcessingItem created is null.", pi); - assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass()); - assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0); - } - - @Test - public void testCreateStream() { - ProcessingItem pi = factory.createPi(processor); - - Stream stream = factory.createStream(pi); - assertNotNull("Stream created is null", stream); - assertEquals("Stream created is not a SimpleStream.", SimpleStream.class, stream.getClass()); - } - - @Test - public void testCreateTopology() { - Topology topology = factory.createTopology(topoName); - assertNotNull("Topology created is null.", topology); - assertEquals("Topology created is not a SimpleTopology.", SimpleTopology.class, topology.getClass()); - } - - @Test - public void testCreateEntrancePi() { - EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); - assertNotNull("EntranceProcessingItem created is null.", entrancePi); - assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.", - SimpleEntranceProcessingItem.class, entrancePi.getClass()); - assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java deleted file mode 100644 index da4be1e..0000000 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import mockit.Mocked; -import mockit.NonStrictExpectations; -import mockit.Tested; -import mockit.Verifications; - -import org.junit.Test; - -/** - * @author Anh Thu Vu - * - */ -public class SimpleEngineTest { - - @Tested - private SimpleEngine unused; - @Mocked - private SimpleTopology topology; - @Mocked - private Runtime mockedRuntime; - - @Test - public void testSubmitTopology() { - new NonStrictExpectations() { - { - Runtime.getRuntime(); - result = mockedRuntime; - mockedRuntime.exit(0); - } - }; - SimpleEngine.submitTopology(topology); - new Verifications() { - { - topology.run(); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java deleted file mode 100644 index 8c1ccaf..0000000 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java +++ /dev/null @@ -1,172 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.junit.Assert.*; -import mockit.Mocked; -import mockit.StrictExpectations; -import mockit.Tested; -import mockit.Verifications; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * @author Anh Thu Vu - * - */ -public class SimpleEntranceProcessingItemTest { - - @Tested - private SimpleEntranceProcessingItem entrancePi; - - @Mocked - private EntranceProcessor entranceProcessor; - @Mocked - private Stream outputStream, anotherStream; - @Mocked - private ContentEvent event; - - @Mocked - private Thread unused; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - entrancePi = new SimpleEntranceProcessingItem(entranceProcessor); - } - - @Test - public void testContructor() { - assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); - } - - @Test - public void testSetOutputStream() { - entrancePi.setOutputStream(outputStream); - assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream()); - } - - @Test - public void testSetOutputStreamRepeate() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(outputStream); - assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream()); - } - - @Test(expected = IllegalStateException.class) - public void testSetOutputStreamError() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(anotherStream); - } - - @Test - public void testInjectNextEventSuccess() { - entrancePi.setOutputStream(outputStream); - new StrictExpectations() { - { - entranceProcessor.hasNext(); - result = true; - - entranceProcessor.nextEvent(); - result = event; - } - }; - entrancePi.injectNextEvent(); - new Verifications() { - { - outputStream.put(event); - } - }; - } - - @Test - public void testStartSendingEvents() { - entrancePi.setOutputStream(outputStream); - new StrictExpectations() { - { - for (int i = 0; i < 1; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = false; - } - - for (int i = 0; i < 5; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = true; - entranceProcessor.nextEvent(); - result = event; - outputStream.put(event); - } - - for (int i = 0; i < 2; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = false; - } - - for (int i = 0; i < 5; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = true; - entranceProcessor.nextEvent(); - result = event; - outputStream.put(event); - } - - entranceProcessor.isFinished(); - result = true; - times = 1; - entranceProcessor.hasNext(); - times = 0; - } - }; - entrancePi.startSendingEvents(); - new Verifications() { - { - try { - Thread.sleep(anyInt); - times = 3; - } catch (InterruptedException e) { - - } - } - }; - } - - @Test(expected = IllegalStateException.class) - public void testStartSendingEventsError() { - entrancePi.startSendingEvents(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java deleted file mode 100644 index cd076fe..0000000 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java +++ /dev/null @@ -1,124 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.junit.Assert.*; - -import java.util.List; -import java.util.concurrent.ExecutorService; - -import mockit.Expectations; -import mockit.Mocked; -import mockit.NonStrictExpectations; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * @author Anh Thu Vu - * - */ -public class SimpleProcessingItemTest { - - @Tested - private SimpleProcessingItem pi; - - @Mocked - private Processor processor; - @Mocked - private SimpleStream stream; - @Mocked - private StreamDestination destination; - @Mocked - private ContentEvent event; - - private final int parallelism = 4; - private final int counter = 2; - - @Before - public void setUp() throws Exception { - pi = new SimpleProcessingItem(processor, parallelism); - } - - @Test - public void testConstructor() { - assertSame("Processor was not set correctly.", processor, pi.getProcessor()); - assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0); - } - - @Test - public void testConnectInputShuffleStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); - stream.addDestination(destination); - } - }; - pi.connectInputShuffleStream(stream); - } - - @Test - public void testConnectInputKeyStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); - stream.addDestination(destination); - } - }; - pi.connectInputKeyStream(stream); - } - - @Test - public void testConnectInputAllStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); - stream.addDestination(destination); - } - }; - pi.connectInputAllStream(stream); - } - - @Test - public void testProcessEvent() { - new Expectations() { - { - for (int i = 0; i < parallelism; i++) { - processor.newProcessor(processor); - result = processor; - - processor.onCreate(anyInt); - } - - processor.process(event); - } - }; - pi.processEvent(event, counter); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java deleted file mode 100644 index 25cb5eb..0000000 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.Arrays; -import java.util.Collection; - -import mockit.Expectations; -import mockit.Mocked; -import mockit.NonStrictExpectations; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * @author Anh Thu Vu - * - */ -@RunWith(Parameterized.class) -public class SimpleStreamTest { - - @Tested - private SimpleStream stream; - - @Mocked - private SimpleProcessingItem sourcePi, destPi; - @Mocked - private ContentEvent event; - @Mocked - private StreamDestination destination; - - private final String eventKey = "eventkey"; - private final int parallelism; - private final PartitioningScheme scheme; - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - { 2, PartitioningScheme.SHUFFLE }, - { 3, PartitioningScheme.GROUP_BY_KEY }, - { 4, PartitioningScheme.BROADCAST } - }); - } - - public SimpleStreamTest(int parallelism, PartitioningScheme scheme) { - this.parallelism = parallelism; - this.scheme = scheme; - } - - @Before - public void setUp() throws Exception { - stream = new SimpleStream(sourcePi); - stream.addDestination(destination); - } - - @Test - public void testPut() { - new NonStrictExpectations() { - { - event.getKey(); - result = eventKey; - destination.getProcessingItem(); - result = destPi; - destination.getPartitioningScheme(); - result = scheme; - destination.getParallelism(); - result = parallelism; - - } - }; - switch (this.scheme) { - case SHUFFLE: - case GROUP_BY_KEY: - new Expectations() { - { - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); - times = 1; - } - }; - break; - case BROADCAST: - new Expectations() { - { - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); - times = parallelism; - } - }; - break; - } - stream.put(event); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java deleted file mode 100644 index 9aaaebd..0000000 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -import static org.junit.Assert.*; - -import java.util.Set; - -import mockit.NonStrictExpectations; -import mockit.Expectations; -import mockit.Mocked; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; - -/** - * @author Anh Thu Vu - * - */ -public class SimpleTopologyTest { - - @Tested - private SimpleTopology topology; - - @Mocked - private SimpleEntranceProcessingItem entrancePi; - @Mocked - private EntranceProcessor entranceProcessor; - - @Before - public void setUp() throws Exception { - topology = new SimpleTopology("TestTopology"); - } - - @Test - public void testAddEntrancePi() { - topology.addEntranceProcessingItem(entrancePi); - - Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); - assertNotNull("Set of entrance PIs is null.", entrancePIs); - assertEquals("Number of entrance PI in SimpleTopology must be 1", 1, entrancePIs.size()); - assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]); - // TODO: verify that entrance PI is in the set of ProcessingItems - // Need to access topology's set of PIs (getProcessingItems() method) - } - - @Test - public void testRun() { - topology.addEntranceProcessingItem(entrancePi); - - new NonStrictExpectations() { - { - entrancePi.getProcessor(); - result = entranceProcessor; - - } - }; - - new Expectations() { - { - entranceProcessor.onCreate(anyInt); - entrancePi.startSendingEvents(); - } - }; - topology.run(); - } - - @Test(expected = IllegalStateException.class) - public void testRunWithoutEntrancePI() { - topology.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java b/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java new file mode 100644 index 0000000..f35b92a --- /dev/null +++ b/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java @@ -0,0 +1,87 @@ +package org.apache.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.LocalDoTask; +import org.junit.Test; + +public class AlgosTest { + + @Test + public void testVHTLocal() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(75f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) + .resultFilePollTimeout(10) + .prePollWait(10) + .taskClassName(LocalDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); + + } + + @Test + public void testBaggingLocal() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(180_000) + .classifiedInstances(210_000) + .classificationsCorrect(60f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) + .prePollWait(10) + .resultFilePollTimeout(10) + .taskClassName(LocalDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + + } + + @Test + public void testNaiveBayesLocal() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(65f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE) + .resultFilePollTimeout(10) + .prePollWait(10) + .taskClassName(LocalDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java new file mode 100644 index 0000000..3085d4c --- /dev/null +++ b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java @@ -0,0 +1,103 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.Tested; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.impl.SimpleComponentFactory; +import org.apache.samoa.topology.impl.SimpleEntranceProcessingItem; +import org.apache.samoa.topology.impl.SimpleProcessingItem; +import org.apache.samoa.topology.impl.SimpleStream; +import org.apache.samoa.topology.impl.SimpleTopology; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class SimpleComponentFactoryTest { + + @Tested + private SimpleComponentFactory factory; + @Mocked + private Processor processor, processorReplica; + @Mocked + private EntranceProcessor entranceProcessor; + + private final int parallelism = 3; + private final String topoName = "TestTopology"; + + @Before + public void setUp() throws Exception { + factory = new SimpleComponentFactory(); + } + + @Test + public void testCreatePiNoParallelism() { + ProcessingItem pi = factory.createPi(processor); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0); + } + + @Test + public void testCreatePiWithParallelism() { + ProcessingItem pi = factory.createPi(processor, parallelism); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testCreateStream() { + ProcessingItem pi = factory.createPi(processor); + + Stream stream = factory.createStream(pi); + assertNotNull("Stream created is null", stream); + assertEquals("Stream created is not a SimpleStream.", SimpleStream.class, stream.getClass()); + } + + @Test + public void testCreateTopology() { + Topology topology = factory.createTopology(topoName); + assertNotNull("Topology created is null.", topology); + assertEquals("Topology created is not a SimpleTopology.", SimpleTopology.class, topology.getClass()); + } + + @Test + public void testCreateEntrancePi() { + EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); + assertNotNull("EntranceProcessingItem created is null.", entrancePi); + assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.", + SimpleEntranceProcessingItem.class, entrancePi.getClass()); + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java new file mode 100644 index 0000000..59d5afe --- /dev/null +++ b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java @@ -0,0 +1,62 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import mockit.Mocked; +import mockit.NonStrictExpectations; +import mockit.Tested; +import mockit.Verifications; + +import org.apache.samoa.topology.impl.SimpleEngine; +import org.apache.samoa.topology.impl.SimpleTopology; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class SimpleEngineTest { + + @Tested + private SimpleEngine unused; + @Mocked + private SimpleTopology topology; + @Mocked + private Runtime mockedRuntime; + + @Test + public void testSubmitTopology() { + new NonStrictExpectations() { + { + Runtime.getRuntime(); + result = mockedRuntime; + mockedRuntime.exit(0); + } + }; + SimpleEngine.submitTopology(topology); + new Verifications() { + { + topology.run(); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java new file mode 100644 index 0000000..41d1f46 --- /dev/null +++ b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java @@ -0,0 +1,172 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.StrictExpectations; +import mockit.Tested; +import mockit.Verifications; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.impl.SimpleEntranceProcessingItem; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class SimpleEntranceProcessingItemTest { + + @Tested + private SimpleEntranceProcessingItem entrancePi; + + @Mocked + private EntranceProcessor entranceProcessor; + @Mocked + private Stream outputStream, anotherStream; + @Mocked + private ContentEvent event; + + @Mocked + private Thread unused; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + entrancePi = new SimpleEntranceProcessingItem(entranceProcessor); + } + + @Test + public void testContructor() { + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } + + @Test + public void testSetOutputStream() { + entrancePi.setOutputStream(outputStream); + assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test + public void testSetOutputStreamRepeate() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(outputStream); + assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test(expected = IllegalStateException.class) + public void testSetOutputStreamError() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(anotherStream); + } + + @Test + public void testInjectNextEventSuccess() { + entrancePi.setOutputStream(outputStream); + new StrictExpectations() { + { + entranceProcessor.hasNext(); + result = true; + + entranceProcessor.nextEvent(); + result = event; + } + }; + entrancePi.injectNextEvent(); + new Verifications() { + { + outputStream.put(event); + } + }; + } + + @Test + public void testStartSendingEvents() { + entrancePi.setOutputStream(outputStream); + new StrictExpectations() { + { + for (int i = 0; i < 1; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + for (int i = 0; i < 2; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + entranceProcessor.isFinished(); + result = true; + times = 1; + entranceProcessor.hasNext(); + times = 0; + } + }; + entrancePi.startSendingEvents(); + new Verifications() { + { + try { + Thread.sleep(anyInt); + times = 3; + } catch (InterruptedException e) { + + } + } + }; + } + + @Test(expected = IllegalStateException.class) + public void testStartSendingEventsError() { + entrancePi.startSendingEvents(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java new file mode 100644 index 0000000..42602ec --- /dev/null +++ b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java @@ -0,0 +1,125 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import mockit.Expectations; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import mockit.Tested; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.impl.SimpleProcessingItem; +import org.apache.samoa.topology.impl.SimpleStream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class SimpleProcessingItemTest { + + @Tested + private SimpleProcessingItem pi; + + @Mocked + private Processor processor; + @Mocked + private SimpleStream stream; + @Mocked + private StreamDestination destination; + @Mocked + private ContentEvent event; + + private final int parallelism = 4; + private final int counter = 2; + + @Before + public void setUp() throws Exception { + pi = new SimpleProcessingItem(processor, parallelism); + } + + @Test + public void testConstructor() { + assertSame("Processor was not set correctly.", processor, pi.getProcessor()); + assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testConnectInputShuffleStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); + stream.addDestination(destination); + } + }; + pi.connectInputShuffleStream(stream); + } + + @Test + public void testConnectInputKeyStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); + stream.addDestination(destination); + } + }; + pi.connectInputKeyStream(stream); + } + + @Test + public void testConnectInputAllStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); + stream.addDestination(destination); + } + }; + pi.connectInputAllStream(stream); + } + + @Test + public void testProcessEvent() { + new Expectations() { + { + for (int i = 0; i < parallelism; i++) { + processor.newProcessor(processor); + result = processor; + + processor.onCreate(anyInt); + } + + processor.process(event); + } + }; + pi.processEvent(event, counter); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java new file mode 100644 index 0000000..51e25bb --- /dev/null +++ b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java @@ -0,0 +1,122 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Arrays; +import java.util.Collection; + +import mockit.Expectations; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import mockit.Tested; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.impl.SimpleProcessingItem; +import org.apache.samoa.topology.impl.SimpleStream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * @author Anh Thu Vu + * + */ +@RunWith(Parameterized.class) +public class SimpleStreamTest { + + @Tested + private SimpleStream stream; + + @Mocked + private SimpleProcessingItem sourcePi, destPi; + @Mocked + private ContentEvent event; + @Mocked + private StreamDestination destination; + + private final String eventKey = "eventkey"; + private final int parallelism; + private final PartitioningScheme scheme; + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + { 2, PartitioningScheme.SHUFFLE }, + { 3, PartitioningScheme.GROUP_BY_KEY }, + { 4, PartitioningScheme.BROADCAST } + }); + } + + public SimpleStreamTest(int parallelism, PartitioningScheme scheme) { + this.parallelism = parallelism; + this.scheme = scheme; + } + + @Before + public void setUp() throws Exception { + stream = new SimpleStream(sourcePi); + stream.addDestination(destination); + } + + @Test + public void testPut() { + new NonStrictExpectations() { + { + event.getKey(); + result = eventKey; + destination.getProcessingItem(); + result = destPi; + destination.getPartitioningScheme(); + result = scheme; + destination.getParallelism(); + result = parallelism; + + } + }; + switch (this.scheme) { + case SHUFFLE: + case GROUP_BY_KEY: + new Expectations() { + { + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = 1; + } + }; + break; + case BROADCAST: + new Expectations() { + { + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = parallelism; + } + }; + break; + } + stream.put(event); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java new file mode 100644 index 0000000..6d8d728 --- /dev/null +++ b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java @@ -0,0 +1,97 @@ +package org.apache.samoa.topology.impl; + +import static org.junit.Assert.*; + +import java.util.Set; + +import mockit.NonStrictExpectations; +import mockit.Expectations; +import mockit.Mocked; +import mockit.Tested; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.impl.SimpleEntranceProcessingItem; +import org.apache.samoa.topology.impl.SimpleTopology; +import org.junit.Before; +import org.junit.Test; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +/** + * @author Anh Thu Vu + * + */ +public class SimpleTopologyTest { + + @Tested + private SimpleTopology topology; + + @Mocked + private SimpleEntranceProcessingItem entrancePi; + @Mocked + private EntranceProcessor entranceProcessor; + + @Before + public void setUp() throws Exception { + topology = new SimpleTopology("TestTopology"); + } + + @Test + public void testAddEntrancePi() { + topology.addEntranceProcessingItem(entrancePi); + + Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); + assertNotNull("Set of entrance PIs is null.", entrancePIs); + assertEquals("Number of entrance PI in SimpleTopology must be 1", 1, entrancePIs.size()); + assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]); + // TODO: verify that entrance PI is in the set of ProcessingItems + // Need to access topology's set of PIs (getProcessingItems() method) + } + + @Test + public void testRun() { + topology.addEntranceProcessingItem(entrancePi); + + new NonStrictExpectations() { + { + entrancePi.getProcessor(); + result = entranceProcessor; + + } + }; + + new Expectations() { + { + entranceProcessor.onCreate(anyInt); + entrancePi.startSendingEvents(); + } + }; + topology.run(); + } + + @Test(expected = IllegalStateException.class) + public void testRunWithoutEntrancePI() { + topology.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-s4/pom.xml b/samoa-s4/pom.xml index 8135be9..0f9b804 100644 --- a/samoa-s4/pom.xml +++ b/samoa-s4/pom.xml @@ -31,14 +31,14 @@ <artifactId>samoa-s4</artifactId> <parent> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa</artifactId> <version>0.3.0-SNAPSHOT</version> </parent> <dependencies> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-api</artifactId> <version>${project.version}</version> </dependency> @@ -117,7 +117,7 @@ <Implementation-Version>${project.version}</Implementation-Version> <Implementation-Vendor>Yahoo Labs</Implementation-Vendor> <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id> - <S4-App-Class>com.yahoo.labs.samoa.topology.impl.S4DoTask</S4-App-Class> + <S4-App-Class>org.apache.samoa.topology.impl.S4DoTask</S4-App-Class> <S4-Version>${s4.version}</S4-Version> </manifestEntries> </archive> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/samoa-s4-adapter/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-s4/samoa-s4-adapter/pom.xml b/samoa-s4/samoa-s4-adapter/pom.xml index 27c8d51..5a66a1e 100644 --- a/samoa-s4/samoa-s4-adapter/pom.xml +++ b/samoa-s4/samoa-s4-adapter/pom.xml @@ -37,22 +37,16 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <!-- PARENT MODULE SAMOA-S4 <parent> <groupId>com.yahoo.labs.bcn.samoa</groupId> <artifactId>samoa-s4</artifactId> <version>0.1</version> - </parent> --> - - <!-- SAMOA-S4-ADAPTER MODUEL --> <artifactId>samoa-s4-adapter</artifactId> - <groupId>com.yahoo.labs.bcn.samoa</groupId> + <groupId>org.apache.samoa</groupId> <version>0.1</version> <name>samoa-s4-adapter</name> <description>Adapter module to connect to external stream and also to provide entrance processing items for SAMOA</description> <dependencies> - <!-- dependency> <artifactId>samoa-api</artifactId> <groupId>com.yahoo.labs.bcn.samoa</groupId> <version>0.1</version> - </dependency> --> <dependency> <artifactId>samoa-s4</artifactId> - <groupId>com.yahoo.labs.bcn.samoa</groupId> + <groupId>org.apache.samoa</groupId> <version>0.1</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java deleted file mode 100644 index 24602a4..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.ComponentFactory; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; - -/** - * S4 Platform Component Factory - * - * @author severien - * - */ -public class S4ComponentFactory implements ComponentFactory { - - public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class); - protected S4DoTask app; - - @Override - public ProcessingItem createPi(Processor processor, int paralellism) { - S4ProcessingItem processingItem = new S4ProcessingItem(app); - // TODO refactor how to set the paralellism level - processingItem.setParalellismLevel(paralellism); - processingItem.setProcessor(processor); - - return processingItem; - } - - @Override - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } - - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - // TODO Create source Entry processing item that connects to an external - // stream - S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app); - entrancePi.setParallelism(1); // FIXME should not be set to 1 statically - return entrancePi; - } - - @Override - public Stream createStream(IProcessingItem sourcePi) { - S4Stream aStream = new S4Stream(app); - return aStream; - } - - @Override - public Topology createTopology(String topoName) { - return new S4Topology(topoName); - } - - /** - * Initialization method. - * - * @param evalTask - */ - public void init(String evalTask) { - // Task is initiated in the DoTaskApp - } - - /** - * Sets S4 application. - * - * @param app - */ - public void setApp(S4DoTask app) { - this.app = app; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java deleted file mode 100644 index cc4b18d..0000000 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java +++ /dev/null @@ -1,268 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * License - */ - -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.s4.base.Event; -import org.apache.s4.base.KeyFinder; -import org.apache.s4.core.App; -import org.apache.s4.core.ProcessingElement; -import org.apache.s4.core.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.github.javacliparser.Option; -import com.github.javacliparser.ClassOption; -import com.yahoo.labs.samoa.core.Globals; -import com.yahoo.labs.samoa.tasks.Task; -import com.yahoo.labs.samoa.topology.ComponentFactory; - -import com.google.inject.Inject; -import com.google.inject.name.Named; - -/* - * S4 App that runs samoa Tasks - * - * */ - -/** - * The Class DoTaskApp. - */ -final public class S4DoTask extends App { - - private final Logger logger = LoggerFactory.getLogger(S4DoTask.class); - Task task; - - @Inject - @Named("evalTask") - public String evalTask; - - public S4DoTask() { - super(); - } - - /** The engine. */ - protected ComponentFactory componentFactory; - - /** - * Gets the factory. - * - * @return the factory - */ - public ComponentFactory getFactory() { - return componentFactory; - } - - /** - * Sets the factory. - * - * @param factory - * the new factory - */ - public void setFactory(ComponentFactory factory) { - this.componentFactory = factory; - } - - /* - * Build the application - * - * @see org.apache.s4.core.App#onInit() - */ - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onInit() - */ - @Override - protected void onInit() { - logger.info("DoTaskApp onInit"); - // ConsoleReporters prints S4 metrics - // MetricsRegistry mr = new MetricsRegistry(); - // - // CsvReporter.enable(new File(System.getProperty("user.home") - // + "/monitor/"), 10, TimeUnit.SECONDS); - // ConsoleReporter.enable(10, TimeUnit.SECONDS); - try { - System.err.println(); - System.err.println(Globals.getWorkbenchInfoString()); - System.err.println(); - - } catch (Exception ex) { - ex.printStackTrace(); - } - S4ComponentFactory factory = new S4ComponentFactory(); - factory.setApp(this); - - // logger.debug("LC {}", lc); - - // task = TaskProvider.getTask(evalTask); - - // EXAMPLE OPTIONS - // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K - // 5 -N 0.0) - // String[] args = new String[] {evalTask,"-l", "Clustream","-g", - // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents", - // "-K", "5", "-N", "0.0)"}; - // String[] args = new String[] { evalTask, "-l", "clustream.Clustream", - // "-g", "clustream.Clustream", "-i", "100000", "-s", - // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" }; - logger.debug("PARAMETERS {}", evalTask); - // params = params.replace(":", " "); - List<String> parameters = new ArrayList<String>(); - // parameters.add(evalTask); - try { - parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" "))); - } catch (UnsupportedEncodingException ex) { - ex.printStackTrace(); - } - String[] args = parameters.toArray(new String[0]); - Option[] extraOptions = new Option[] {}; - // build a single string by concatenating cli options - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); - } - - // parse options - try { - task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); - task.setFactory(factory); - task.init(); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onStart() - */ - @Override - protected void onStart() { - logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId()); - // <<<<<<< HEAD Task doesn't have start in latest storm-impl - // TODO change the way the app starts - // if (this.getPartitionId() == 0) - S4Topology s4topology = (S4Topology) getTask().getTopology(); - S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem(); - while (epi.injectNextEvent()) - // inject events from the EntrancePI - ; - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onClose() - */ - @Override - protected void onClose() { - System.out.println("Closing DoTaskApp..."); - - } - - /** - * Gets the task. - * - * @return the task - */ - public Task getTask() { - return task; - } - - // These methods are protected in App and can not be accessed from outside. - // They are - // called from parallel classifiers and evaluations. Is there a better way - // to do that? - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createPE(java.lang.Class) - */ - @Override - public <T extends ProcessingElement> T createPE(Class<T> type) { - return super.createPE(type); - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createStream(java.lang.String, - * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[]) - */ - @Override - public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, - ProcessingElement... processingElements) { - return super.createStream(name, finder, processingElements); - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createStream(java.lang.String, - * org.apache.s4.core.ProcessingElement[]) - */ - @Override - public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) { - return super.createStream(name, processingElements); - } - - // @com.beust.jcommander.Parameters(separators = "=") - // class Parameters { - // - // @Parameter(names={"-lc","-local"}, description="Local clustering method") - // private String localClustering; - // - // @Parameter(names={"-gc","-global"}, - // description="Global clustering method") - // private String globalClustering; - // - // } - // - // class ParametersConverter {// implements IStringConverter<String[]> { - // - // - // public String[] convertToArgs(String value) { - // - // String[] params = value.split(","); - // String[] args = new String[params.length*2]; - // for(int i=0; i<params.length ; i++) { - // args[i] = params[i].split("=")[0]; - // args[i+1] = params[i].split("=")[1]; - // i++; - // } - // return args; - // } - // - // } - -}
