SAMOA-57: Remove support for S4
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/de050f99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/de050f99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/de050f99 Branch: refs/heads/master Commit: de050f994d895dfb599d209ba267fcf5de132f2e Parents: b86ab83 Author: Gianmarco De Francisci Morales <[email protected]> Authored: Sun Mar 6 13:49:06 2016 +0300 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Sun Mar 6 13:49:06 2016 +0300 ---------------------------------------------------------------------- .travis.yml | 10 - pom.xml | 12 - samoa-s4/pom.xml | 136 ---------- samoa-s4/samoa-s4-adapter/pom.xml | 54 ---- .../samoa/topology/adapter/S4AdapterApp.java | 45 ---- .../adapter/S4EntranceProcessingItem.java | 74 ----- .../samoa/topology/adapter/package-info.java | 28 -- samoa-s4/src/main/assembly/samoa-s4.xml | 78 ------ .../samoa/topology/impl/S4ComponentFactory.java | 97 ------- .../apache/samoa/topology/impl/S4DoTask.java | 268 ------------------- .../topology/impl/S4EntranceProcessingItem.java | 119 -------- .../org/apache/samoa/topology/impl/S4Event.java | 91 ------- .../samoa/topology/impl/S4ProcessingItem.java | 186 ------------- .../apache/samoa/topology/impl/S4Stream.java | 184 ------------- .../apache/samoa/topology/impl/S4Submitter.java | 144 ---------- .../apache/samoa/topology/impl/S4Topology.java | 63 ----- .../samoa/topology/impl/SamoaSerializer.java | 99 ------- .../topology/impl/SamoaSerializerModule.java | 35 --- 18 files changed, 1723 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index a122ebf..6cf1ad3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,16 +8,6 @@ jdk: - oraclejdk7 install: -- git clone https://github.com/apache/incubator-s4.git -- cd incubator-s4 -- git checkout tags/0.6.0-Final -- mv ../bin/s4-build/gradlew . -- wget http://people.apache.org/~gdfm/gradle-wrapper-1.4.jar -- mv gradle-wrapper-1.4.jar ./lib/ -- mv ../bin/s4-build/gradle-wrapper-1.4.properties ./lib/ -- ./gradlew install -- ./gradlew s4-tools::installApp -- cd .. - echo "<settings><servers><server><id>apache.snapshots.https</id><username>${SOSS_USERNAME}</username><password>${SOSS_PASSWORD}</password></server></servers></settings>" > ${HOME}/.m2/settings.xml #- cat ${HOME}/.m2/settings.xml http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index af8fe98..3304937 100644 --- a/pom.xml +++ b/pom.xml @@ -76,15 +76,6 @@ </modules> </profile> <profile> - <id>s4</id> - <modules> - <module>samoa-instances</module> - <module>samoa-api</module> - <module>samoa-s4</module> - <module>samoa-test</module> - </modules> - </profile> - <profile> <id>flink</id> <modules> <module>samoa-instances</module> @@ -111,7 +102,6 @@ <module>samoa-threads</module> <module>samoa-storm</module> <module>samoa-flink</module> - <module>samoa-s4</module> <module>samoa-samza</module> <module>samoa-test</module> </modules> @@ -135,7 +125,6 @@ <kryo.version>2.21</kryo.version> <metrics-core.version>2.2.0</metrics-core.version> <miniball.version>1.0.3</miniball.version> - <s4.version>0.6.0-incubating</s4.version> <samza.version>0.7.0</samza.version> <flink.version>0.10.1</flink.version> <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version> @@ -219,7 +208,6 @@ <root>samoa-instances</root> <root>samoa-local</root> <root>samoa-storm</root> - <root>samoa-s4</root> <root>samoa-flink</root> <root>samoa-samza</root> <root>samoa-test</root> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-s4/pom.xml b/samoa-s4/pom.xml deleted file mode 100644 index f3e24a3..0000000 --- a/samoa-s4/pom.xml +++ /dev/null @@ -1,136 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - #%L - SAMOA - %% - Copyright (C) 2014 - 2015 Apache Software Foundation - %% - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - #L% - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - - <name>samoa-s4</name> - <description>S4 bindings for SAMOA</description> - - <artifactId>samoa-s4</artifactId> - <parent> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa</artifactId> - <version>0.4.0-incubating-SNAPSHOT</version> - </parent> - - <dependencies> - <dependency> - <groupId>org.apache.samoa</groupId> - <artifactId>samoa-api</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>com.github.javacliparser</groupId> - <artifactId>javacliparser</artifactId> - <version>${javacliparser.version}</version> - </dependency> - - <!-- S4 dependencies need to be installed separately as they are not available via Maven yet --> - <dependency> - <groupId>org.apache.s4</groupId> - <artifactId>s4-base</artifactId> - <version>${s4.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.s4</groupId> - <artifactId>s4-comm</artifactId> - <version>${s4.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.s4</groupId> - <artifactId>s4-core</artifactId> - <version>${s4.version}</version> - <scope>provided</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>${maven-dependency-plugin.version}</version> - <configuration> - <outputDirectory>${project.build.directory}/lib</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>false</overWriteSnapshots> - <overWriteIfNewer>true</overWriteIfNewer> - - <excludeGroupIds>org.apache.s4</excludeGroupIds> - <excludeTransitive>true</excludeTransitive> - </configuration> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - </execution> - </executions> - </plugin> - - <!-- SAMOA assembly --> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>${maven-assembly-plugin.version}</version> - <configuration> - <descriptors> - <descriptor>src/main/assembly/samoa-s4.xml</descriptor> - </descriptors> - <finalName>SAMOA-S4-${project.version}</finalName> - <attach>false</attach> - <outputDirectory>../target</outputDirectory> - <appendAssemblyId>false</appendAssemblyId> - <archive> - <manifestEntries> - <Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version> - <Bundle-Description>${project.description}</Bundle-Description> - <Implementation-Version>${project.version}</Implementation-Version> - <Implementation-Vendor>Yahoo Labs</Implementation-Vendor> - <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id> - <S4-App-Class>org.apache.samoa.topology.impl.S4DoTask</S4-App-Class> - <S4-Version>${s4.version}</S4-Version> - </manifestEntries> - </archive> - </configuration> - <executions> - <execution> - <id>make-assembly</id> <!-- this is used for inheritance merges --> - <phase>package</phase> <!-- bind to the packaging phase --> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-s4/samoa-s4-adapter/pom.xml b/samoa-s4/samoa-s4-adapter/pom.xml deleted file mode 100644 index 5a66a1e..0000000 --- a/samoa-s4/samoa-s4-adapter/pom.xml +++ /dev/null @@ -1,54 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - #%L - SAMOA - %% - Copyright (C) 2014 - 2015 Apache Software Foundation - %% - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - #L% - --> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>samoa-s4-adapter</artifactId> - <groupId>org.apache.samoa</groupId> - <version>0.1</version> - <name>samoa-s4-adapter</name> - <description>Adapter module to connect to external stream and also to provide entrance processing items for SAMOA</description> - - <dependencies> - <dependency> - <artifactId>samoa-s4</artifactId> - <groupId>org.apache.samoa</groupId> - <version>0.1</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java ---------------------------------------------------------------------- diff --git a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java deleted file mode 100644 index 1634502..0000000 --- a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java +++ /dev/null @@ -1,45 +0,0 @@ -package samoa.topology.adapter; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.s4.core.adapter.AdapterApp; - -import samoa.sandbox.SourceProcessor; -import samoa.streams.StreamSourceProcessor; - -public class S4AdapterApp extends AdapterApp { - - S4EntranceProcessingItem entrancePI; - StreamSourceProcessor sourceProcessor; - - @Override - protected void onInit() { - entrancePI = new S4EntranceProcessingItem(this); - sourceProcessor = new StreamSourceProcessor(); - entrancePI.setProcessor(sourceProcessor); - } - - @Override - protected void onStart() { - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java deleted file mode 100644 index 4c22a0b..0000000 --- a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java +++ /dev/null @@ -1,74 +0,0 @@ -package samoa.topology.adapter; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.s4.core.App; -import org.apache.s4.core.ProcessingElement; - -import samoa.core.Processor; -import samoa.topology.EntranceProcessingItem; -import samoa.topology.impl.DoTaskApp; -import weka.core.Instance; - -public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem { - - private Processor processor; - //DoTaskApp app; - - - public S4EntranceProcessingItem(App app){ - super(app); - //this.app = (DoTaskApp) app; - this.setSingleton(true); - - } - - @Override - public Processor getProcessor() { - return this.processor; - } - - @Override - public void put(Instance inst) { - // do nothing - //may not needed - - } - - @Override - protected void onCreate() { - - // if (this.processor != null){ -// this.processor = this.processor.newProcessor(this.processor); -// this.processor.onCreate(Integer.parseInt(getId())); -// } - } - - @Override - protected void onRemove() { - //do nothing - - } - - public void setProcessor(Processor processor){ - this.processor = processor; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java ---------------------------------------------------------------------- diff --git a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java deleted file mode 100644 index 2203a32..0000000 --- a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * - */ -/** - * @author severien - * - */ -package samoa.topology.adapter; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/assembly/samoa-s4.xml ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/assembly/samoa-s4.xml b/samoa-s4/src/main/assembly/samoa-s4.xml deleted file mode 100644 index 41a93f5..0000000 --- a/samoa-s4/src/main/assembly/samoa-s4.xml +++ /dev/null @@ -1,78 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - #%L - SAMOA - %% - Copyright (C) 2014 - 2015 Apache Software Foundation - %% - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - #L% - --> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>dist</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - - <fileSets> - <!-- SAMOA API artifacts --> - <fileSet> - <outputDirectory>lib/</outputDirectory> - <directory>../samoa-api/target/lib/</directory> - <includes> - <include>*</include> - </includes> - </fileSet> - <fileSet> - <outputDirectory>app/</outputDirectory> - <directory>../samoa-api/target/</directory> - <includes> - <include>samoa-api-*.jar</include> - </includes> - </fileSet> - - <!-- SAMOA S4 artifacts --> - <fileSet> - <outputDirectory>app/</outputDirectory> - <directory>target/</directory> - <includes> - <include>samoa-s4-*.jar</include> - </includes> - </fileSet> - <fileSet> - <outputDirectory>/</outputDirectory> - <directory>target/</directory> - <includes> - <include>lib/*</include> - </includes> - </fileSet> - </fileSets> - -</assembly> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java deleted file mode 100644 index ebd18f9..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.core.Processor; -import org.apache.samoa.topology.ComponentFactory; -import org.apache.samoa.topology.EntranceProcessingItem; -import org.apache.samoa.topology.IProcessingItem; -import org.apache.samoa.topology.ProcessingItem; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.topology.Topology; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * S4 Platform Component Factory - * - * @author severien - * - */ -public class S4ComponentFactory implements ComponentFactory { - - public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class); - protected S4DoTask app; - - @Override - public ProcessingItem createPi(Processor processor, int paralellism) { - S4ProcessingItem processingItem = new S4ProcessingItem(app); - // TODO refactor how to set the paralellism level - processingItem.setParalellismLevel(paralellism); - processingItem.setProcessor(processor); - - return processingItem; - } - - @Override - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } - - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - // TODO Create source Entry processing item that connects to an external - // stream - S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app); - entrancePi.setParallelism(1); // FIXME should not be set to 1 statically - return entrancePi; - } - - @Override - public Stream createStream(IProcessingItem sourcePi) { - S4Stream aStream = new S4Stream(app); - return aStream; - } - - @Override - public Topology createTopology(String topoName) { - return new S4Topology(topoName); - } - - /** - * Initialization method. - * - * @param evalTask - */ - public void init(String evalTask) { - // Task is initiated in the DoTaskApp - } - - /** - * Sets S4 application. - * - * @param app - */ - public void setApp(S4DoTask app) { - this.app = app; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java deleted file mode 100644 index d52e981..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java +++ /dev/null @@ -1,268 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * License - */ - -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.s4.base.Event; -import org.apache.s4.base.KeyFinder; -import org.apache.s4.core.App; -import org.apache.s4.core.ProcessingElement; -import org.apache.s4.core.Stream; -import org.apache.samoa.core.Globals; -import org.apache.samoa.tasks.Task; -import org.apache.samoa.topology.ComponentFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.Option; -import com.github.javacliparser.ClassOption; -import com.google.inject.Inject; -import com.google.inject.name.Named; - -/* - * S4 App that runs samoa Tasks - * - * */ - -/** - * The Class DoTaskApp. - */ -final public class S4DoTask extends App { - - private final Logger logger = LoggerFactory.getLogger(S4DoTask.class); - Task task; - - @Inject - @Named("evalTask") - public String evalTask; - - public S4DoTask() { - super(); - } - - /** The engine. */ - protected ComponentFactory componentFactory; - - /** - * Gets the factory. - * - * @return the factory - */ - public ComponentFactory getFactory() { - return componentFactory; - } - - /** - * Sets the factory. - * - * @param factory - * the new factory - */ - public void setFactory(ComponentFactory factory) { - this.componentFactory = factory; - } - - /* - * Build the application - * - * @see org.apache.s4.core.App#onInit() - */ - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onInit() - */ - @Override - protected void onInit() { - logger.info("DoTaskApp onInit"); - // ConsoleReporters prints S4 metrics - // MetricsRegistry mr = new MetricsRegistry(); - // - // CsvReporter.enable(new File(System.getProperty("user.home") - // + "/monitor/"), 10, TimeUnit.SECONDS); - // ConsoleReporter.enable(10, TimeUnit.SECONDS); - try { - System.err.println(); - System.err.println(Globals.getWorkbenchInfoString()); - System.err.println(); - - } catch (Exception ex) { - ex.printStackTrace(); - } - S4ComponentFactory factory = new S4ComponentFactory(); - factory.setApp(this); - - // logger.debug("LC {}", lc); - - // task = TaskProvider.getTask(evalTask); - - // EXAMPLE OPTIONS - // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K - // 5 -N 0.0) - // String[] args = new String[] {evalTask,"-l", "Clustream","-g", - // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents", - // "-K", "5", "-N", "0.0)"}; - // String[] args = new String[] { evalTask, "-l", "clustream.Clustream", - // "-g", "clustream.Clustream", "-i", "100000", "-s", - // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" }; - logger.debug("PARAMETERS {}", evalTask); - // params = params.replace(":", " "); - List<String> parameters = new ArrayList<String>(); - // parameters.add(evalTask); - try { - parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" "))); - } catch (UnsupportedEncodingException ex) { - ex.printStackTrace(); - } - String[] args = parameters.toArray(new String[0]); - Option[] extraOptions = new Option[] {}; - // build a single string by concatenating cli options - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); - } - - // parse options - try { - task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); - task.setFactory(factory); - task.init(); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onStart() - */ - @Override - protected void onStart() { - logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId()); - // <<<<<<< HEAD Task doesn't have start in latest storm-impl - // TODO change the way the app starts - // if (this.getPartitionId() == 0) - S4Topology s4topology = (S4Topology) getTask().getTopology(); - S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem(); - while (epi.injectNextEvent()) - // inject events from the EntrancePI - ; - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onClose() - */ - @Override - protected void onClose() { - System.out.println("Closing DoTaskApp..."); - - } - - /** - * Gets the task. - * - * @return the task - */ - public Task getTask() { - return task; - } - - // These methods are protected in App and can not be accessed from outside. - // They are - // called from parallel classifiers and evaluations. Is there a better way - // to do that? - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createPE(java.lang.Class) - */ - @Override - public <T extends ProcessingElement> T createPE(Class<T> type) { - return super.createPE(type); - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createStream(java.lang.String, - * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[]) - */ - @Override - public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, - ProcessingElement... processingElements) { - return super.createStream(name, finder, processingElements); - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createStream(java.lang.String, - * org.apache.s4.core.ProcessingElement[]) - */ - @Override - public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) { - return super.createStream(name, processingElements); - } - - // @com.beust.jcommander.Parameters(separators = "=") - // class Parameters { - // - // @Parameter(names={"-lc","-local"}, description="Local clustering method") - // private String localClustering; - // - // @Parameter(names={"-gc","-global"}, - // description="Global clustering method") - // private String globalClustering; - // - // } - // - // class ParametersConverter {// implements IStringConverter<String[]> { - // - // - // public String[] convertToArgs(String value) { - // - // String[] params = value.split(","); - // String[] args = new String[params.length*2]; - // for(int i=0; i<params.length ; i++) { - // args[i] = params[i].split("=")[0]; - // args[i+1] = params[i].split("=")[1]; - // i++; - // } - // return args; - // } - // - // } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java deleted file mode 100644 index 771cbc8..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java +++ /dev/null @@ -1,119 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.s4.core.App; -import org.apache.s4.core.ProcessingElement; -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.topology.EntranceProcessingItem; -import org.apache.samoa.topology.Stream; - -// TODO adapt this entrance processing item to connect to external streams so the application doesnt need to use an AdapterApp - -public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem { - - private EntranceProcessor entranceProcessor; - // private S4DoTask app; - private int parallelism; - protected Stream outputStream; - - /** - * Constructor of an S4 entrance processing item. - * - * @param app - * : S4 application - */ - public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) { - super(app); - this.entranceProcessor = entranceProcessor; - // this.app = (S4DoTask) app; - // this.setSingleton(true); - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public int getParallelism() { - return this.parallelism; - } - - @Override - public EntranceProcessor getProcessor() { - return this.entranceProcessor; - } - - // - // @Override - // public void put(Instance inst) { - // // do nothing - // // may not needed - // } - - @Override - protected void onCreate() { - // was commented - if (this.entranceProcessor != null) { - // TODO revisit if we need to change it to a clone() call - this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor); - this.entranceProcessor.onCreate(Integer.parseInt(getId())); - } - } - - @Override - protected void onRemove() { - // do nothing - } - - // - // /** - // * Sets the entrance processing item processor. - // * - // * @param processor - // */ - // public void setProcessor(Processor processor) { - // this.entranceProcessor = processor; - // } - - @Override - public void setName(String name) { - super.setName(name); - } - - @Override - public EntranceProcessingItem setOutputStream(Stream stream) { - if (this.outputStream != null) - throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once"); - this.outputStream = stream; - return this; - } - - public boolean injectNextEvent() { - if (entranceProcessor.hasNext()) { - ContentEvent nextEvent = this.entranceProcessor.nextEvent(); - outputStream.put(nextEvent); - return entranceProcessor.hasNext(); - } else - return false; - // return !nextEvent.isLastEvent(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java deleted file mode 100644 index 154715b..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * License - */ - -import net.jcip.annotations.Immutable; - -import org.apache.s4.base.Event; -import org.apache.samoa.core.ContentEvent; - -/** - * The Class InstanceEvent. - */ -@Immutable -final public class S4Event extends Event { - - private String key; - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - /** The content event. */ - private ContentEvent contentEvent; - - /** - * Instantiates a new instance event. - */ - public S4Event() { - // Needed for serialization of kryo - } - - /** - * Instantiates a new instance event. - * - * @param contentEvent - * the content event - */ - public S4Event(ContentEvent contentEvent) { - if (contentEvent != null) { - this.contentEvent = contentEvent; - this.key = contentEvent.getKey(); - - } - } - - /** - * Gets the content event. - * - * @return the content event - */ - public ContentEvent getContentEvent() { - return contentEvent; - } - - /** - * Sets the content event. - * - * @param contentEvent - * the new content event - */ - public void setContentEvent(ContentEvent contentEvent) { - this.contentEvent = contentEvent; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java deleted file mode 100644 index b9c7467..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java +++ /dev/null @@ -1,186 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.apache.s4.base.KeyFinder; -import org.apache.s4.core.App; -import org.apache.s4.core.ProcessingElement; -import org.apache.samoa.core.Processor; -import org.apache.samoa.topology.ProcessingItem; -import org.apache.samoa.topology.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * S4 Platform platform specific processing item, inherits from S4 ProcessinElemnt. - * - * @author severien - * - */ -public class S4ProcessingItem extends ProcessingElement implements - ProcessingItem { - - public static final Logger logger = LoggerFactory - .getLogger(S4ProcessingItem.class); - - private Processor processor; - private int paralellismLevel; - private S4DoTask app; - - private static final String NAME = "PROCESSING-ITEM-"; - private static int OBJ_COUNTER = 0; - - /** - * Constructor of S4 ProcessingItem. - * - * @param app - * : S4 application - */ - public S4ProcessingItem(App app) { - super(app); - super.setName(NAME + OBJ_COUNTER); - OBJ_COUNTER++; - this.app = (S4DoTask) app; - this.paralellismLevel = 1; - } - - @Override - public String getName() { - return super.getName(); - } - - /** - * Gets processing item paralellism level. - * - * @return int - */ - public int getParalellismLevel() { - return paralellismLevel; - } - - /** - * Sets processing item paralellism level. - * - * @param paralellismLevel - */ - public void setParalellismLevel(int paralellismLevel) { - this.paralellismLevel = paralellismLevel; - } - - /** - * onEvent method. - * - * @param event - */ - public void onEvent(S4Event event) { - if (processor.process(event.getContentEvent()) == true) { - close(); - } - } - - /** - * Sets S4 processing item processor. - * - * @param processor - */ - public void setProcessor(Processor processor) { - this.processor = processor; - } - - // Methods from ProcessingItem - @Override - public Processor getProcessor() { - return processor; - } - - /** - * KeyFinder sets the keys for a specific event. - * - * @return KeyFinder - */ - private KeyFinder<S4Event> getKeyFinder() { - KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() { - @Override - public List<String> get(S4Event s4event) { - List<String> results = new ArrayList<String>(); - results.add(s4event.getKey()); - return results; - } - }; - - return keyFinder; - } - - @Override - public ProcessingItem connectInputAllStream(Stream inputStream) { - - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this, S4Stream.BROADCAST); - return this; - } - - @Override - public ProcessingItem connectInputKeyStream(Stream inputStream) { - - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this, S4Stream.GROUP_BY_KEY); - - return this; - } - - @Override - public ProcessingItem connectInputShuffleStream(Stream inputStream) { - S4Stream stream = (S4Stream) inputStream; - stream.setParallelism(this.paralellismLevel); - stream.addStream(inputStream.getStreamId(), - getKeyFinder(), this, S4Stream.SHUFFLE); - - return this; - } - - // Methods from ProcessingElement - @Override - protected void onCreate() { - logger.debug("PE ID {}", getId()); - if (this.processor != null) { - this.processor = this.processor.newProcessor(this.processor); - this.processor.onCreate(Integer.parseInt(getId())); - } - } - - @Override - protected void onRemove() { - // do nothing - } - - @Override - public int getParallelism() { - return this.paralellismLevel; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java deleted file mode 100644 index 734462e..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java +++ /dev/null @@ -1,184 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.s4.base.KeyFinder; -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.topology.AbstractStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * S4 Platform specific stream. - * - * @author severien - * - */ -public class S4Stream extends AbstractStream { - - public static final int SHUFFLE = 0; - public static final int GROUP_BY_KEY = 1; - public static final int BROADCAST = 2; - - private static final Logger logger = LoggerFactory.getLogger(S4Stream.class); - - private S4DoTask app; - private int processingItemParalellism; - private int shuffleCounter; - - private static final String NAME = "STREAM-"; - private static int OBJ_COUNTER = 0; - - /* The stream list */ - public List<StreamType> streams; - - public S4Stream(S4DoTask app) { - super(); - this.app = app; - this.processingItemParalellism = 1; - this.shuffleCounter = 0; - this.streams = new ArrayList<StreamType>(); - this.setStreamId(NAME + OBJ_COUNTER); - OBJ_COUNTER++; - } - - public S4Stream(S4DoTask app, S4ProcessingItem pi) { - super(); - this.app = app; - this.processingItemParalellism = 1; - this.shuffleCounter = 0; - this.streams = new ArrayList<StreamType>(); - this.setStreamId(NAME + OBJ_COUNTER); - OBJ_COUNTER++; - - } - - /** - * - * @return - */ - public int getParallelism() { - return processingItemParalellism; - } - - public void setParallelism(int parallelism) { - this.processingItemParalellism = parallelism; - } - - public void addStream(String streamID, KeyFinder<S4Event> finder, - S4ProcessingItem pi, int type) { - String streamName = streamID + "_" + pi.getName(); - org.apache.s4.core.Stream<S4Event> stream = this.app.createStream( - streamName, pi); - stream.setName(streamName); - logger.debug("Stream name S4Stream {}", streamName); - if (finder != null) - stream.setKey(finder); - this.streams.add(new StreamType(stream, type)); - - } - - @Override - public void put(ContentEvent event) { - - for (int i = 0; i < streams.size(); i++) { - - switch (streams.get(i).getType()) { - case SHUFFLE: - S4Event s4event = new S4Event(event); - s4event.setStreamId(streams.get(i).getStream().getName()); - if (getParallelism() == 1) { - s4event.setKey("0"); - } else { - s4event.setKey(Integer.toString(shuffleCounter)); - } - streams.get(i).getStream().put(s4event); - shuffleCounter++; - if (shuffleCounter >= (getParallelism())) { - shuffleCounter = 0; - } - - break; - - case GROUP_BY_KEY: - S4Event s4event1 = new S4Event(event); - s4event1.setStreamId(streams.get(i).getStream().getName()); - HashCodeBuilder hb = new HashCodeBuilder(); - hb.append(event.getKey()); - String key = Integer.toString(hb.build() % getParallelism()); - s4event1.setKey(key); - streams.get(i).getStream().put(s4event1); - break; - - case BROADCAST: - for (int p = 0; p < this.getParallelism(); p++) { - S4Event s4event2 = new S4Event(event); - s4event2.setStreamId(streams.get(i).getStream().getName()); - s4event2.setKey(Integer.toString(p)); - streams.get(i).getStream().put(s4event2); - } - break; - - default: - break; - } - - } - - } - - /** - * Subclass for definig stream connection type - * - * @author severien - * - */ - class StreamType { - org.apache.s4.core.Stream<S4Event> stream; - int type; - - public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) { - this.stream = s; - this.type = t; - } - - public org.apache.s4.core.Stream<S4Event> getStream() { - return stream; - } - - public void setStream(org.apache.s4.core.Stream<S4Event> stream) { - this.stream = stream; - } - - public int getType() { - return type; - } - - public void setType(int type) { - this.type = type; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java deleted file mode 100644 index 22807a6..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java +++ /dev/null @@ -1,144 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.File; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.s4.core.util.AppConfig; -import org.apache.s4.core.util.ParsingUtils; -import org.apache.s4.deploy.DeploymentUtils; -import org.apache.samoa.tasks.Task; -import org.apache.samoa.topology.ISubmitter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; - -public class S4Submitter implements ISubmitter { - - private static Logger logger = LoggerFactory.getLogger(S4Submitter.class); - - @Override - public void deployTask(Task task) { - // TODO: Get application FROM HTTP server - // TODO: Initializa a http server to serve the app package - - String appURIString = null; - // File app = new File(System.getProperty("user.dir") - // + "/src/site/dist/SAMOA-S4-0.1-dist.jar"); - - // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar - try { - URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar"); - appURIString = appURL.toString(); - } catch (MalformedURLException e1) { - e1.printStackTrace(); - } - - // try { - // appURIString = app.toURI().toURL().toString(); - // } catch (MalformedURLException e) { - // e.printStackTrace(); - // } - if (task == null) { - logger.error("Can't execute since evaluation task is not set!"); - return; - } else { - logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ", - task.getClass().getSimpleName(), appURIString); - } - - String[] args = { "-c=testCluster2", - "-appClass=" + S4DoTask.class.getName(), - "-appName=" + "samoaApp", - "-p=evalTask=" + task.getClass().getSimpleName(), - "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + SamoaSerializerModule.class.getName() }; - // "-emc=" + S4MOAModule.class.getName(), - // "@" + - // Resources.getResource("s4moa.properties").getFile(), - - S4Config s4config = new S4Config(); - JCommander jc = new JCommander(s4config); - jc.parse(args); - - Map<String, String> namedParameters = new HashMap<String, String>(); - for (String parameter : s4config.namedParameters) { - String[] param = parameter.split("="); - namedParameters.put(param[0], param[1]); - } - - AppConfig config = new AppConfig.Builder() - .appClassName(s4config.appClass).appName(s4config.appName) - .appURI(s4config.appURI).namedParameters(namedParameters) - .build(); - - DeploymentUtils.initAppConfig(config, s4config.clusterName, true, - s4config.zkString); - - System.out.println("Suposedly deployed on S4"); - } - - public void initHTTPServer() { - - } - - @Parameters(separators = "=") - public static class S4Config { - - @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true) - String clusterName = null; - - @Parameter(names = "-appClass", description = "Main App class", required = false) - String appClass = null; - - @Parameter(names = "-appName", description = "Application name", required = false) - String appName = null; - - @Parameter(names = "-s4r", description = "Application URI", required = false) - String appURI = null; - - @Parameter(names = "-zk", description = "ZooKeeper connection string", required = false) - String zkString = null; - - @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false) - List<String> extraModules = new ArrayList<String>(); - - @Parameter(names = { "-p", "-namedStringParameters" }, description = "Comma-separated list of inline configuration " - + "parameters, taking precedence over homonymous configuration parameters from configuration files. " - + "Syntax: '-p=name1=value1,name2=value2 '", required = false, converter = ParsingUtils.InlineConfigParameterConverter.class) - List<String> namedParameters = new ArrayList<String>(); - - } - - @Override - public void setLocal(boolean bool) { - // TODO S4 works the same for local and distributed environments - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java deleted file mode 100644 index 413cfda..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.topology.AbstractTopology; -import org.apache.samoa.topology.EntranceProcessingItem; - -public class S4Topology extends AbstractTopology { - - // CASEY: it seems evaluationTask is not used. - // Remove it for now - - // private String _evaluationTask; - - // S4Topology(String topoName, String evalTask) { - // super(topoName); - // } - // - // S4Topology(String topoName) { - // this(topoName, null); - // } - - // @Override - // public void setEvaluationTask(String evalTask) { - // _evaluationTask = evalTask; - // } - // - // @Override - // public String getEvaluationTask() { - // return _evaluationTask; - // } - - S4Topology(String topoName) { - super(topoName); - } - - protected EntranceProcessingItem getEntranceProcessingItem() { - if (this.getEntranceProcessingItems() == null) - return null; - if (this.getEntranceProcessingItems().size() < 1) - return null; - // TODO: support multiple entrance PIs - return (EntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0]; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java deleted file mode 100644 index 9f9f144..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java +++ /dev/null @@ -1,99 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.nio.ByteBuffer; - -import org.apache.s4.base.SerializerDeserializer; -import org.apache.samoa.learners.classifiers.trees.AttributeContentEvent; -import org.apache.samoa.learners.classifiers.trees.ComputeContentEvent; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -public class SamoaSerializer implements SerializerDeserializer { - - private ThreadLocal<Kryo> kryoThreadLocal; - private ThreadLocal<Output> outputThreadLocal; - - private int initialBufferSize = 2048; - private int maxBufferSize = 256 * 1024; - - public void setMaxBufferSize(int maxBufferSize) { - this.maxBufferSize = maxBufferSize; - } - - /** - * - * @param classLoader - * classloader able to handle classes to serialize/deserialize. For instance, application-level events can - * only be handled by the application classloader. - */ - @Inject - public SamoaSerializer(@Assisted final ClassLoader classLoader) { - kryoThreadLocal = new ThreadLocal<Kryo>() { - - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - kryo.setClassLoader(classLoader); - kryo.register(AttributeContentEvent.class, new AttributeContentEvent.AttributeCEFullPrecSerializer()); - kryo.register(ComputeContentEvent.class, new ComputeContentEvent.ComputeCEFullPrecSerializer()); - kryo.setRegistrationRequired(false); - return kryo; - } - }; - - outputThreadLocal = new ThreadLocal<Output>() { - @Override - protected Output initialValue() { - Output output = new Output(initialBufferSize, maxBufferSize); - return output; - } - }; - - } - - @Override - public Object deserialize(ByteBuffer rawMessage) { - Input input = new Input(rawMessage.array()); - try { - return kryoThreadLocal.get().readClassAndObject(input); - } finally { - input.close(); - } - } - - @SuppressWarnings("resource") - @Override - public ByteBuffer serialize(Object message) { - Output output = outputThreadLocal.get(); - try { - kryoThreadLocal.get().writeClassAndObject(output, message); - return ByteBuffer.wrap(output.toBytes()); - } finally { - output.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java deleted file mode 100644 index e530a09..0000000 --- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.s4.base.SerializerDeserializer; - -import com.google.inject.AbstractModule; - -public class SamoaSerializerModule extends AbstractModule { - - @Override - protected void configure() { - bind(SerializerDeserializer.class).to(SamoaSerializer.class); - - } - -}
