http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pom.xml b/demos/pom.xml deleted file mode 100644 index 99cfcd2..0000000 --- a/demos/pom.xml +++ /dev/null @@ -1,231 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <artifactId>malhar-demos</artifactId> - <packaging>pom</packaging> - <name>Apache Apex Malhar Demos</name> - - <properties> - <apex.apppackage.groupid>${project.groupId}</apex.apppackage.groupid> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <semver.plugin.skip>true</semver.plugin.skip> - <maven.deploy.skip>true</maven.deploy.skip> - </properties> - - <profiles> - <profile> - <id>demo-plugin-activation</id> - <activation> - <file> - <exists>${basedir}/src/main</exists> - </file> - </activation> - <build> - <plugins> - <plugin> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>2.3.2</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.core.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${apex.apppackage.groupid}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>all-modules</id> - <modules> - <module>distributedistinct</module> - <module>highlevelapi</module> - <module>sql</module> - </modules> - </profile> - </profiles> - - <modules> - <module>machinedata</module> - <module>pi</module> - <module>twitter</module> - <module>yahoofinance</module> - <module>frauddetect</module> - <module>mobile</module> - <module>wordcount</module> - <module>mrmonitor</module> - <module>mroperator</module> - <module>uniquecount</module> - <module>r</module> - <module>echoserver</module> - <module>iteration</module> - </modules> - - <dependencies> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.core.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${project.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - -</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/pom.xml ---------------------------------------------------------------------- diff --git a/demos/r/pom.xml b/demos/r/pom.xml deleted file mode 100644 index d8b73f8..0000000 --- a/demos/r/pom.xml +++ /dev/null @@ -1,83 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>r-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar R Demo</name> - <description>Apex demo applications for using R.</description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <repositories> - <repository> - <id>datatorrent-3rd-party</id> - <name>Embedded repository for dependencies not available online</name> - <url>https://www.datatorrent.com/maven/content/repositories/thirdparty</url> - <snapshots> - <updatePolicy>daily</updatePolicy> - </snapshots> - <releases> - <updatePolicy>daily</updatePolicy> - </releases> - </repository> - </repositories> - - <dependencies> - <dependency> - <groupId>org.rosuda</groupId> - <artifactId>jri</artifactId> - <version>1.0</version> - </dependency> - <dependency> - <groupId>org.rosuda</groupId> - <artifactId>rengine</artifactId> - <version>1.0</version> - </dependency> - <dependency> - <groupId>org.rosuda</groupId> - <artifactId>jriengine</artifactId> - <version>1.0</version> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-contrib</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/r/src/assemble/appPackage.xml b/demos/r/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/r/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java ---------------------------------------------------------------------- diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java deleted file mode 100755 index b2bfd46..0000000 --- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.r.oldfaithful; - -/** - * @since 2.1.0 - */ -public class FaithfulKey -{ - - private static final long serialVersionUID = 201403251620L; - - private double eruptionDuration; - private int waitingTime; - - public FaithfulKey() - { - } - - public double getEruptionDuration() - { - return eruptionDuration; - } - - public void setEruptionDuration(double eruptionDuration) - { - this.eruptionDuration = eruptionDuration; - } - - public int getWaitingTime() - { - return waitingTime; - } - - public void setWaitingTime(int waitingTime) - { - this.waitingTime = waitingTime; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java ---------------------------------------------------------------------- diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java deleted file mode 100755 index cf49848..0000000 --- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.r.oldfaithful; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.contrib.r.RScript; - -/** - * @since 2.1.0 - */ -public class FaithfulRScript extends RScript -{ - - private transient List<FaithfulKey> readingsList = new ArrayList<FaithfulKey>(); - private int elapsedTime; - private static final Logger LOG = LoggerFactory.getLogger(FaithfulRScript.class); - - public FaithfulRScript() - { - super(); - } - - public FaithfulRScript(String rScriptFilePath, String rFunction, String returnVariable) - { - super(rScriptFilePath, rFunction, returnVariable); - } - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>() - { - @Override - public void process(FaithfulKey tuple) - { - // Create a map of ("String", values) to be passed to the process - // function in the RScipt operator's process() - readingsList.add(tuple); - - } - - }; - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>() - { - @Override - public void process(Integer eT) - { - elapsedTime = eT; - } - }; - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - } - - @Override - public void endWindow() - { - if (readingsList.size() == 0) { - return; - } - LOG.info("Input data size: readingsList - " + readingsList.size()); - - double[] eruptionDuration = new double[readingsList.size()]; - int[] waitingTime = new int[readingsList.size()]; - - for (int i = 0; i < readingsList.size(); i++) { - eruptionDuration[i] = readingsList.get(i).getEruptionDuration(); - waitingTime[i] = readingsList.get(i).getWaitingTime(); - } - LOG.info("Input data size: eruptionDuration - " + eruptionDuration.length); - LOG.info("Input data size: waitingTime - " + waitingTime.length); - - HashMap<String, Object> map = new HashMap<String, Object>(); - - map.put("ELAPSEDTIME", elapsedTime); - map.put("ERUPTIONS", eruptionDuration); - map.put("WAITING", waitingTime); - - super.process(map); - readingsList.clear(); - map.clear(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java ---------------------------------------------------------------------- diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java deleted file mode 100755 index c45cd50..0000000 --- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.r.oldfaithful; - -import java.util.Random; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; - -/** - * The InputGenerator operator is used to generate input for the 'Old Faithful Geyser" application. - * This application accepts readings for the waiting time and the subsequent eruption duration - * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next - * eruption given the elapsed time since the last eruption. - * The training data is generated for an application window and consists of multiple - * waiting times and eruption duration values. - * For every application window, it generates only one 'elapsed time' input for which the - * prediction would be made. - * - * @since 2.1.0 - */ - -public class InputGenerator implements InputOperator -{ - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(InputGenerator.class); - private int blastCount = 1000; - private Random random = new Random(); - private static int emitCount = 0; - - public final transient DefaultOutputPort<FaithfulKey> outputPort = new DefaultOutputPort<FaithfulKey>(); - - public final transient DefaultOutputPort<Integer> elapsedTime = new DefaultOutputPort<Integer>(); - - public void setBlastCount(int blastCount) - { - this.blastCount = blastCount; - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - @Override - public void setup(Context.OperatorContext context) - { - } - - @Override - public void teardown() - { - } - - private int nextRandomId(int min, int max) - { - int id; - do { - id = (int)Math.abs(Math.round(random.nextGaussian() * max)); - } - while (id >= max); - - if (id < min) { - id = min; - } - try { - // Slowdown input generation - if (emitCount++ % 97 == 0) { - Thread.sleep(1); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - return id; - } - - @Override - public void emitTuples() - { - boolean elapsedTimeSent = false; - - try { - for (int i = 0; i < blastCount; ++i) { - int waitingTime = nextRandomId(3600, 36000); - - double eruptionDuration = -2.15 + 0.05 * waitingTime; - emitTuple(eruptionDuration, waitingTime); - - if (!elapsedTimeSent) { - int eT = 0; - - if (i % 100 == 0) { - eT = 54 + waitingTime; - - emitElapsedTime(eT); - elapsedTimeSent = true; - } - } - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - private void emitTuple(double eruptionDuration, int waitingTime) - { - FaithfulKey faithfulkey = new FaithfulKey(); - - faithfulkey.setEruptionDuration(eruptionDuration); - faithfulkey.setWaitingTime(waitingTime); - - this.outputPort.emit(faithfulkey); - } - - private void emitElapsedTime(int eT) - { - this.elapsedTime.emit(eT); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java ---------------------------------------------------------------------- diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java deleted file mode 100755 index 0483767..0000000 --- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.r.oldfaithful; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.io.ConsoleOutputOperator; - -/** - * The application attempts to simulate 'Old Faithful Geyser" eruption. - * This application accepts readings for the waiting time and the subsequent eruption duration - * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next - * eruption given the elapsed time since the last eruption. - * The training data is generated for an application window and consists of multiple - * waiting times and eruption duration values. - * For every application window, it generates only one 'elapsed time' input for which the - * prediction would be made. - * Model in R is in file ruptionModel.R located at - * demos/r/src/main/resources/com/datatorrent/demos/oldfaithful/ directory - * - * @since 2.1.0 - */ - -@ApplicationAnnotation(name = "OldFaithfulApplication") -public class OldFaithfulApplication implements StreamingApplication -{ - private final DAG.Locality locality = null; - - /** - * Create the DAG - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - - InputGenerator randomInputGenerator = dag.addOperator("rand", new InputGenerator()); - FaithfulRScript rScriptOp = dag.addOperator("rScriptOp", new FaithfulRScript("com/datatorrent/demos/r/oldfaithful/eruptionModel.R", "eruptionModel", "retVal")); - ConsoleOutputOperator consoles = dag.addOperator("consoles", new ConsoleOutputOperator()); - - Map<String, FaithfulRScript.REXP_TYPE> argTypeMap = new HashMap<String, FaithfulRScript.REXP_TYPE>(); - - argTypeMap.put("ELAPSEDTIME", FaithfulRScript.REXP_TYPE.REXP_INT); - argTypeMap.put("ERUPTIONS", FaithfulRScript.REXP_TYPE.REXP_ARRAY_DOUBLE); - argTypeMap.put("WAITING", FaithfulRScript.REXP_TYPE.REXP_ARRAY_INT); - - rScriptOp.setArgTypeMap(argTypeMap); - - dag.addStream("ingen_faithfulRscript", randomInputGenerator.outputPort, rScriptOp.faithfulInput).setLocality(locality); - dag.addStream("ingen_faithfulRscript_eT", randomInputGenerator.elapsedTime, rScriptOp.inputElapsedTime).setLocality(locality); - dag.addStream("faithfulRscript_console_s", rScriptOp.strOutput, consoles.input).setLocality(locality); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/r/src/main/resources/META-INF/properties.xml b/demos/r/src/main/resources/META-INF/properties.xml deleted file mode 100755 index ec8b070..0000000 --- a/demos/r/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> -<!--properties for R demo --> - <property> - <name>dt.application.OldFaithfulApplication.class</name> - <value>com.datatorrent.demos.r.oldfaithful.OldFaithfulApplication</value> - <description>An alias for OldFaithful application</description> - </property> - - <property> - <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name> - <value>1024</value> - </property> - -<!-- Need this to information for loading native libraries --> - <property> - <name>dt.attr.CONTAINER_JVM_OPTIONS</name> - <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value> - </property> - -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R ---------------------------------------------------------------------- diff --git a/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R b/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R deleted file mode 100755 index e46fa8d..0000000 --- a/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/Rscript -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -# This script apply the simple linear regression model for the data set 'faithful', -# and estimates the next eruption duration given the waiting time since the last eruption. -# - - eruptionModel <- function() { - - datavar = data.frame(ERUPTIONS, WAITING) - - #attach data variable - attach(datavar) - - #create a linear model using lm(FORMULA, DATAVAR) - #predict the fall eruption duration (ERUPT) using the waiting time since the last eruption (WAITING) - eruption.lm <- lm(ERUPTIONS ~ WAITING, datavar) - - #display linear model - eruption.lm - - # Get the values of the intercept and unemployment so as to be able to predict the enrolment - interc<-eruption.lm$coeff[["(Intercept)"]] - eruptionDuration<-eruption.lm$coeff[["WAITING"]] - - # Calculate the enrollment based on the percentage being asked for, and the model that has been rated above. - nextEruptionDuration<-(interc+(eruptionDuration * ELAPSEDTIME)) - -retVal<-paste("nextEruptionDuration ", nextEruptionDuration, sep=": ") -#retVal<-c("interc : ",interc, ", eruptionDuration : ", eruptionDuration,", nextEruptionDuration : ", nextEruptionDuration) - -sort( sapply(mget(ls()),object.size) ) - -detach(datavar); - -# Clear all the data from R workspace -rm(datavar); -rm(ERUPTIONS); -rm(WAITING); - -return(retVal) -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java deleted file mode 100755 index 0bb1901..0000000 --- a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.r.oldfaithful; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -public class OldFaithfulApplicationTest -{ - - private static final Logger LOG = LoggerFactory.getLogger(OldFaithfulApplicationTest.class); - - @Test - public void testSomeMethod() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - OldFaithfulApplication app = new OldFaithfulApplication(); - app.populateDAG(lma.getDAG(), new Configuration(false)); - - try { - LocalMode.Controller lc = lma.getController(); - lc.setHeartbeatMonitoringEnabled(false); - lc.run(5000); - } catch (Exception e) { - LOG.error("Exception: ", e); - Assert.fail("Unexpected exception."); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/test/resources/dt-site-oldfaithful.xml ---------------------------------------------------------------------- diff --git a/demos/r/src/test/resources/dt-site-oldfaithful.xml b/demos/r/src/test/resources/dt-site-oldfaithful.xml deleted file mode 100755 index ec8b070..0000000 --- a/demos/r/src/test/resources/dt-site-oldfaithful.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> -<!--properties for R demo --> - <property> - <name>dt.application.OldFaithfulApplication.class</name> - <value>com.datatorrent.demos.r.oldfaithful.OldFaithfulApplication</value> - <description>An alias for OldFaithful application</description> - </property> - - <property> - <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name> - <value>1024</value> - </property> - -<!-- Need this to information for loading native libraries --> - <property> - <name>dt.attr.CONTAINER_JVM_OPTIONS</name> - <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value> - </property> - -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/r/src/test/resources/log4j.properties b/demos/r/src/test/resources/log4j.properties deleted file mode 100755 index cf0d19e..0000000 --- a/demos/r/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/pom.xml ---------------------------------------------------------------------- diff --git a/demos/sql/pom.xml b/demos/sql/pom.xml deleted file mode 100644 index 69ffa73..0000000 --- a/demos/sql/pom.xml +++ /dev/null @@ -1,102 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>sql-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar SQL API Demo</name> - <description>Apex demo applications that use SQL APIs to construct a DAG</description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.core.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-sql</artifactId> - <version>${project.version}</version> - </dependency> - - <!-- For KafkaTest --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-kafka</artifactId> - <version>${project.parent.version}</version> - <type>test-jar</type> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>0.9.0.0</version> - <classifier>test</classifier> - <scope>test</scope> - </dependency> - - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/assemble/appPackage.xml b/demos/sql/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/sql/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java deleted file mode 100644 index 80b997d..0000000 --- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import java.util.Date; -import java.util.Map; - -import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; -import org.apache.apex.malhar.sql.SQLExecEnvironment; -import org.apache.apex.malhar.sql.table.CSVMessageFormat; -import org.apache.apex.malhar.sql.table.FileEndpoint; -import org.apache.apex.malhar.sql.table.StreamEndpoint; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.ImmutableMap; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.contrib.parser.CsvParser; - - -@ApplicationAnnotation(name = "FusionStyleSQLApplication") -/** - * @since 3.6.0 - */ -public class FusionStyleSQLApplication implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - SQLExecEnvironment env = SQLExecEnvironment.getEnvironment(); - env.registerFunction("APEXCONCAT", PureStyleSQLApplication.class, "apex_concat_str"); - - Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of( - "RowTime", Date.class, - "id", Integer.class, - "Product", String.class, - "units", Integer.class); - - // Add Kafka Input - KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class); - kafkaInput.setInitialOffset("EARLIEST"); - - // Add CSVParser - CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class); - dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in); - - // Register CSV Parser output as input table for first SQL - env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(csvParser.out, fieldMapping)); - - // Register FileEndpoint as output table for second SQL. - env.registerTable(conf.get("sqlSchemaOutputName"), new FileEndpoint(conf.get("folderPath"), - conf.get("fileName"), new CSVMessageFormat(conf.get("sqlSchemaOutputDef")))); - - // Add second SQL to DAG - env.executeSQL(dag, conf.get("sql")); - } - - public static class PassThroughOperator extends BaseOperator - { - public final transient DefaultOutputPort output = new DefaultOutputPort(); - public final transient DefaultInputPort input = new DefaultInputPort() - { - @Override - public void process(Object o) - { - output.emit(output); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java deleted file mode 100644 index 79295f9..0000000 --- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import org.apache.apex.malhar.sql.SQLExecEnvironment; -import org.apache.apex.malhar.sql.table.CSVMessageFormat; -import org.apache.apex.malhar.sql.table.FileEndpoint; -import org.apache.apex.malhar.sql.table.KafkaEndpoint; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -@ApplicationAnnotation(name = "PureStyleSQLApplication") -/** - * @since 3.6.0 - */ -public class PureStyleSQLApplication implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Source definition - String schemaInName = conf.get("schemaInName"); - String schemaInDef = conf.get("schemaInDef"); - String broker = conf.get("broker"); - String sourceTopic = conf.get("topic"); - - // Destination definition - String schemaOutName = conf.get("schemaOutName"); - String schemaOutDef = conf.get("schemaOutDef"); - String outputFolder = conf.get("outputFolder"); - String outFilename = conf.get("destFileName"); - - // SQL statement - String sql = conf.get("sql"); - - SQLExecEnvironment.getEnvironment() - .registerTable(schemaInName, new KafkaEndpoint(broker, sourceTopic, - new CSVMessageFormat(schemaInDef))) - .registerTable(schemaOutName, new FileEndpoint(outputFolder, outFilename, - new CSVMessageFormat(schemaOutDef))) - .registerFunction("APEXCONCAT", this.getClass(), "apex_concat_str") - .executeSQL(dag, sql); - } - - public static String apex_concat_str(String s1, String s2) - { - return s1 + s2; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java deleted file mode 100644 index da4f563..0000000 --- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import org.apache.apex.malhar.sql.SQLExecEnvironment; -import org.apache.apex.malhar.sql.table.CSVMessageFormat; -import org.apache.apex.malhar.sql.table.FileEndpoint; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -@ApplicationAnnotation(name = "SQLApplicationWithAPI") -/** - * @since 3.6.0 - */ -public class SQLApplicationWithAPI implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Source definition - String schemaInName = conf.get("csvSchemaInName"); - String schemaIn = conf.get("csvSchemaIn"); - String sourceFile = conf.get("sourceFile"); - - SQLExecEnvironment.getEnvironment() - .registerTable(schemaInName, new FileEndpoint(sourceFile, new CSVMessageFormat(schemaIn))) - .executeSQL(dag, conf.get("sql")); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java deleted file mode 100644 index 4c90a82..0000000 --- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import java.io.File; -import java.io.IOException; - -import org.apache.apex.malhar.sql.SQLExecEnvironment; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -@ApplicationAnnotation(name = "SQLApplicationWithModelFile") -/** - * @since 3.6.0 - */ -public class SQLApplicationWithModelFile implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - String modelFile = conf.get("modelFile"); - String model; - try { - model = FileUtils.readFileToString(new File(modelFile)); - } catch (IOException e) { - throw new RuntimeException(e); - } - - SQLExecEnvironment.getEnvironment() - .withModel(model) - .executeSQL(dag, conf.get("sql")); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml deleted file mode 100644 index 77852e7..0000000 --- a/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml +++ /dev/null @@ -1,65 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- Kafka Operator Properties --> - <property> - <name>dt.operator.KafkaInput.prop.topics</name> - <value>dataTopic</value> - </property> - <property> - <name>dt.operator.KafkaInput.prop.clusters</name> - <value>localhost:9092</value> <!-- broker (NOT zookeeper) address --> - </property> - - <!-- CSV Parser Properties --> - <property> - <name>dt.operator.CSVParser.prop.schema</name> - <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> - </property> - - <!-- SQL Properties --> - <property> - <name>sqlSchemaInputName</name> - <value>FROMCSV</value> - </property> - <property> - <name>sqlSchemaOutputName</name> - <value>TOFILE</value> - </property> - <property> - <name>folderPath</name> - <value>/tmp/output</value> - </property> - <property> - <name>fileName</name> - <value>output.txt</value> - </property> - <property> - <name>sqlSchemaOutputDef</name> - <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value> - </property> - <property> - <name>sql</name> - <value>INSERT INTO TOFILE SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM FROMCSV WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml deleted file mode 100644 index 0d25aa6..0000000 --- a/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml +++ /dev/null @@ -1,65 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- Input Definition --> - <property> - <name>schemaInName</name> - <value>ORDERS</value> - </property> - <property> - <name>schemaInDef</name> - <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> - </property> - <property> - <name>broker</name> - <value>localhost:9090</value> - </property> - <property> - <name>topic</name> - <value>inputTopic</value> - </property> - - <!-- Output Definition --> - <property> - <name>schemaOutName</name> - <value>SALES</value> - </property> - <property> - <name>schemaOutDef</name> - <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value> - </property> - <property> - <name>outputFolder</name> - <value>/tmp/output</value> - </property> - <property> - <name>destFileName</name> - <value>out.file</value> - </property> - - <!-- Execution SQL --> - <property> - <name>sql</name> - <value>INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml deleted file mode 100644 index 9ac49d4..0000000 --- a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- Input Definition --> - <property> - <name>csvSchemaInName</name> - <value>ORDERS</value> - </property> - <property> - <name>csvSchemaIn</name> - <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> - </property> - <property> - <name>sourceFile</name> - <value>src/test/resources/input.csv</value> - </property> - - <!-- Execution SQL --> - <property> - <name>sql</name> - <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml deleted file mode 100644 index ab026c2..0000000 --- a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml +++ /dev/null @@ -1,32 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>modelFile</name> - <value>src/main/resources/model/model_file_csv.json</value> - </property> - <property> - <name>sql</name> - <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties.xml b/demos/sql/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 6080bf6..0000000 --- a/demos/sql/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- Memory settings for all demos --> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>512</value> - </property> - <property> - <name>dt.application.*.operator.*.attr.MEMORY_MB</name> - <value>256</value> - </property> - <property> - <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> - <value>-Xmx128M</value> - </property> - <property> - <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> - <value>128</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/model/model_file_csv.json ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/model/model_file_csv.json b/demos/sql/src/main/resources/model/model_file_csv.json deleted file mode 100644 index beba18d..0000000 --- a/demos/sql/src/main/resources/model/model_file_csv.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - "version": "1.0", - "defaultSchema": "APEX", - "schemas": [{ - "name": "APEX", - "tables": [ - { - "name": "ORDERS", - "type": "custom", - "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory", - "stream": { - "stream": true - }, - "operand": { - "endpoint": "file", - "messageFormat": "csv", - "endpointOperands": { - "directory": "src/test/resources/input.csv" - }, - "messageFormatOperands": { - "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}" - } - } - } - ] - }] -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java deleted file mode 100644 index 7208701..0000000 --- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import java.io.File; -import java.util.Arrays; -import java.util.List; -import java.util.TimeZone; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import org.apache.apex.malhar.kafka.EmbeddedKafka; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -public class FusionStyleSQLApplicationTest -{ - private final String testTopicData = "dataTopic"; - private final String testTopicResult = "resultTopic"; - - private TimeZone defaultTZ; - private EmbeddedKafka kafka; - - private static String outputFolder = "target/output/"; - - @Rule - public TestName testName = new TestName(); - - @Before - public void setUp() throws Exception - { - defaultTZ = TimeZone.getDefault(); - TimeZone.setDefault(TimeZone.getTimeZone("GMT")); - - kafka = new EmbeddedKafka(); - kafka.start(); - kafka.createTopic(testTopicData); - kafka.createTopic(testTopicResult); - - outputFolder += testName.getMethodName() + "/"; - } - - @After - public void tearDown() throws Exception - { - kafka.stop(); - - TimeZone.setDefault(defaultTZ); - } - - @Test - public void test() throws Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-FusionStyleSQLApplication.xml")); - - conf.set("dt.operator.KafkaInput.prop.topics", testTopicData); - conf.set("dt.operator.KafkaInput.prop.clusters", kafka.getBroker()); - conf.set("folderPath", outputFolder); - conf.set("fileName", "out.tmp"); - - FusionStyleSQLApplication app = new FusionStyleSQLApplication(); - - lma.prepareDAG(app, conf); - - LocalMode.Controller lc = lma.getController(); - - lc.runAsync(); - kafka.publish(testTopicData, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", - "15/02/2016 10:16:00 +0000,2,paint2,12", - "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", - "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); - - Assert.assertTrue(PureStyleSQLApplicationTest.waitTillFileIsPopulated(outputFolder, 40000)); - lc.shutdown(); - - File file = new File(outputFolder); - File file1 = new File(outputFolder + file.list()[0]); - List<String> strings = FileUtils.readLines(file1); - - String[] actualLines = strings.toArray(new String[strings.size()]); - String[] expectedLines = new String[] { - "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4", - "", - "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5", - ""}; - Assert.assertEquals(expectedLines.length, actualLines.length); - for (int i = 0; i < actualLines.length; i++) { - Assert.assertEquals(expectedLines[i], actualLines[i]); - } - } catch (Exception e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java deleted file mode 100644 index f298059..0000000 --- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.TimeZone; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import org.apache.apex.malhar.kafka.EmbeddedKafka; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.LocalMode; - - -public class PureStyleSQLApplicationTest -{ - private final String testTopicData = "dataTopic"; - private final String testTopicResult = "resultTopic"; - - private TimeZone defaultTZ; - private EmbeddedKafka kafka; - private static String outputFolder = "target/output/"; - - @Rule - public TestName testName = new TestName(); - - @Before - public void setUp() throws Exception - { - defaultTZ = TimeZone.getDefault(); - TimeZone.setDefault(TimeZone.getTimeZone("GMT")); - - kafka = new EmbeddedKafka(); - kafka.start(); - kafka.createTopic(testTopicData); - kafka.createTopic(testTopicResult); - - outputFolder += testName.getMethodName() + "/"; - } - - @After - public void tearDown() throws Exception - { - kafka.stop(); - TimeZone.setDefault(defaultTZ); - } - - @Test - public void test() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-PureStyleSQLApplication.xml")); - - conf.set("broker", kafka.getBroker()); - conf.set("topic", testTopicData); - conf.set("outputFolder", outputFolder); - conf.set("destFileName", "out.tmp"); - - PureStyleSQLApplication app = new PureStyleSQLApplication(); - - lma.prepareDAG(app, conf); - - LocalMode.Controller lc = lma.getController(); - - lc.runAsync(); - kafka.publish(testTopicData, Arrays.asList( - "15/02/2016 10:15:00 +0000,1,paint1,11", - "15/02/2016 10:16:00 +0000,2,paint2,12", - "15/02/2016 10:17:00 +0000,3,paint3,13", - "15/02/2016 10:18:00 +0000,4,paint4,14", - "15/02/2016 10:19:00 +0000,5,paint5,15", - "15/02/2016 10:10:00 +0000,6,abcde6,16")); - - Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000)); - lc.shutdown(); - - File file = new File(outputFolder); - File file1 = new File(outputFolder + file.list()[0]); - List<String> strings = FileUtils.readLines(file1); - - String[] actualLines = strings.toArray(new String[strings.size()]); - - String[] expectedLines = new String[]{ - "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4", - "", - "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5", - ""}; - - Assert.assertEquals(expectedLines.length, actualLines.length); - for (int i = 0;i < expectedLines.length; i++) { - Assert.assertEquals(expectedLines[i], actualLines[i]); - } - } - - public static boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException - { - boolean result; - long now = System.currentTimeMillis(); - Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath()); - try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) { - List<String> strings = Lists.newArrayList(); - while (System.currentTimeMillis() - now < timeout) { - if (fs.exists(outDir)) { - File file = new File(outputFolder); - if (file.list().length > 0) { - File file1 = new File(outputFolder + file.list()[0]); - strings = FileUtils.readLines(file1); - if (strings.size() != 0) { - break; - } - } - } - - Thread.sleep(500); - } - - result = fs.exists(outDir) && (strings.size() != 0); - } - - return result; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java deleted file mode 100644 index 6b1a404..0000000 --- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.TimeZone; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.google.common.base.Predicates; -import com.google.common.collect.Collections2; - -import com.datatorrent.api.LocalMode; - - -public class SQLApplicationWithAPITest -{ - private TimeZone defaultTZ; - - @Before - public void setUp() throws Exception - { - defaultTZ = TimeZone.getDefault(); - TimeZone.setDefault(TimeZone.getTimeZone("GMT")); - } - - @After - public void tearDown() throws Exception - { - TimeZone.setDefault(defaultTZ); - } - - @Test - public void test() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithAPI.xml")); - - SQLApplicationWithAPI app = new SQLApplicationWithAPI(); - - lma.prepareDAG(app, conf); - - LocalMode.Controller lc = lma.getController(); - - PrintStream originalSysout = System.out; - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - System.setOut(new PrintStream(baos)); - - lc.runAsync(); - SQLApplicationWithModelFileTest.waitTillStdoutIsPopulated(baos, 30000); - lc.shutdown(); - - System.setOut(originalSysout); - - String[] sout = baos.toString().split(System.lineSeparator()); - Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); - - String[] actualLines = filter.toArray(new String[filter.size()]); - Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1")); - Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2")); - Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3")); - Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4")); - Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5")); - Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6")); - } -}
