http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml new file mode 100644 index 0000000..30ce061 --- /dev/null +++ b/examples/pom.xml @@ -0,0 +1,231 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples</artifactId> + <packaging>pom</packaging> + <name>Apache Apex Malhar Examples</name> + + <properties> + <apex.apppackage.groupid>${project.groupId}</apex.apppackage.groupid> + <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> + <semver.plugin.skip>true</semver.plugin.skip> + <maven.deploy.skip>true</maven.deploy.skip> + </properties> + + <profiles> + <profile> + <id>example-plugin-activation</id> + <activation> + <file> + <exists>${basedir}/src/main</exists> + </file> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.9</version> + <configuration> + <downloadSources>true</downloadSources> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <encoding>UTF-8</encoding> + <source>1.7</source> + <target>1.7</target> + <debug>true</debug> + <optimize>false</optimize> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>target/deps</outputDirectory> + <includeScope>runtime</includeScope> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>app-package-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-${project.version}-apexapp</finalName> + <appendAssemblyId>false</appendAssemblyId> + <descriptors> + <descriptor>src/assemble/appPackage.xml</descriptor> + </descriptors> + <archiverConfig> + <defaultDirectoryMode>0755</defaultDirectoryMode> + </archiverConfig> + <archive> + <manifestEntries> + <Class-Path>${apex.apppackage.classpath}</Class-Path> + <DT-Engine-Version>${apex.core.version}</DT-Engine-Version> + <DT-App-Package-Group-Id>${apex.apppackage.groupid}</DT-App-Package-Group-Id> + <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> + <DT-App-Package-Version>${project.version}</DT-App-Package-Version> + <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> + <DT-App-Package-Description>${project.description}</DT-App-Package-Description> + </manifestEntries> + </archive> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <phase>package</phase> + <configuration> + <target> + <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" + tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>all-modules</id> + <modules> + <module>distributedistinct</module> + <module>highlevelapi</module> + <module>sql</module> + </modules> + </profile> + </profiles> + + <modules> + <module>machinedata</module> + <module>pi</module> + <module>twitter</module> + <module>yahoofinance</module> + <module>frauddetect</module> + <module>mobile</module> + <module>wordcount</module> + <module>mrmonitor</module> + <module>mroperator</module> + <module>uniquecount</module> + <module>r</module> + <module>echoserver</module> + <module>iteration</module> + </modules> + + <dependencies> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.core.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-library</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-library</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + +</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/pom.xml ---------------------------------------------------------------------- diff --git a/examples/r/pom.xml b/examples/r/pom.xml new file mode 100644 index 0000000..46b7d47 --- /dev/null +++ b/examples/r/pom.xml @@ -0,0 +1,83 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-r</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar R Example</name> + <description>Apex example applications for using R.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <repositories> + <repository> + <id>datatorrent-3rd-party</id> + <name>Embedded repository for dependencies not available online</name> + <url>https://www.datatorrent.com/maven/content/repositories/thirdparty</url> + <snapshots> + <updatePolicy>daily</updatePolicy> + </snapshots> + <releases> + <updatePolicy>daily</updatePolicy> + </releases> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.rosuda</groupId> + <artifactId>jri</artifactId> + <version>1.0</version> + </dependency> + <dependency> + <groupId>org.rosuda</groupId> + <artifactId>rengine</artifactId> + <version>1.0</version> + </dependency> + <dependency> + <groupId>org.rosuda</groupId> + <artifactId>jriengine</artifactId> + <version>1.0</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/r/src/assemble/appPackage.xml b/examples/r/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/r/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java ---------------------------------------------------------------------- diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java new file mode 100755 index 0000000..8c08940 --- /dev/null +++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.r.oldfaithful; + +/** + * @since 2.1.0 + */ +public class FaithfulKey +{ + + private static final long serialVersionUID = 201403251620L; + + private double eruptionDuration; + private int waitingTime; + + public FaithfulKey() + { + } + + public double getEruptionDuration() + { + return eruptionDuration; + } + + public void setEruptionDuration(double eruptionDuration) + { + this.eruptionDuration = eruptionDuration; + } + + public int getWaitingTime() + { + return waitingTime; + } + + public void setWaitingTime(int waitingTime) + { + this.waitingTime = waitingTime; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java ---------------------------------------------------------------------- diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java new file mode 100755 index 0000000..4b61d42 --- /dev/null +++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.r.oldfaithful; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.contrib.r.RScript; + +/** + * @since 2.1.0 + */ +public class FaithfulRScript extends RScript +{ + + private transient List<FaithfulKey> readingsList = new ArrayList<FaithfulKey>(); + private int elapsedTime; + private static final Logger LOG = LoggerFactory.getLogger(FaithfulRScript.class); + + public FaithfulRScript() + { + super(); + } + + public FaithfulRScript(String rScriptFilePath, String rFunction, String returnVariable) + { + super(rScriptFilePath, rFunction, returnVariable); + } + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>() + { + @Override + public void process(FaithfulKey tuple) + { + // Create a map of ("String", values) to be passed to the process + // function in the RScipt operator's process() + readingsList.add(tuple); + + } + + }; + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer eT) + { + elapsedTime = eT; + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + } + + @Override + public void endWindow() + { + if (readingsList.size() == 0) { + return; + } + LOG.info("Input data size: readingsList - " + readingsList.size()); + + double[] eruptionDuration = new double[readingsList.size()]; + int[] waitingTime = new int[readingsList.size()]; + + for (int i = 0; i < readingsList.size(); i++) { + eruptionDuration[i] = readingsList.get(i).getEruptionDuration(); + waitingTime[i] = readingsList.get(i).getWaitingTime(); + } + LOG.info("Input data size: eruptionDuration - " + eruptionDuration.length); + LOG.info("Input data size: waitingTime - " + waitingTime.length); + + HashMap<String, Object> map = new HashMap<String, Object>(); + + map.put("ELAPSEDTIME", elapsedTime); + map.put("ERUPTIONS", eruptionDuration); + map.put("WAITING", waitingTime); + + super.process(map); + readingsList.clear(); + map.clear(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java ---------------------------------------------------------------------- diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java new file mode 100755 index 0000000..fb18726 --- /dev/null +++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.r.oldfaithful; + +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; + +/** + * The InputGenerator operator is used to generate input for the 'Old Faithful Geyser" application. + * This application accepts readings for the waiting time and the subsequent eruption duration + * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next + * eruption given the elapsed time since the last eruption. + * The training data is generated for an application window and consists of multiple + * waiting times and eruption duration values. + * For every application window, it generates only one 'elapsed time' input for which the + * prediction would be made. + * + * @since 2.1.0 + */ + +public class InputGenerator implements InputOperator +{ + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(InputGenerator.class); + private int blastCount = 1000; + private Random random = new Random(); + private static int emitCount = 0; + + public final transient DefaultOutputPort<FaithfulKey> outputPort = new DefaultOutputPort<FaithfulKey>(); + + public final transient DefaultOutputPort<Integer> elapsedTime = new DefaultOutputPort<Integer>(); + + public void setBlastCount(int blastCount) + { + this.blastCount = blastCount; + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + private int nextRandomId(int min, int max) + { + int id; + do { + id = (int)Math.abs(Math.round(random.nextGaussian() * max)); + } + while (id >= max); + + if (id < min) { + id = min; + } + try { + // Slowdown input generation + if (emitCount++ % 97 == 0) { + Thread.sleep(1); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + return id; + } + + @Override + public void emitTuples() + { + boolean elapsedTimeSent = false; + + try { + for (int i = 0; i < blastCount; ++i) { + int waitingTime = nextRandomId(3600, 36000); + + double eruptionDuration = -2.15 + 0.05 * waitingTime; + emitTuple(eruptionDuration, waitingTime); + + if (!elapsedTimeSent) { + int eT = 0; + + if (i % 100 == 0) { + eT = 54 + waitingTime; + + emitElapsedTime(eT); + elapsedTimeSent = true; + } + } + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private void emitTuple(double eruptionDuration, int waitingTime) + { + FaithfulKey faithfulkey = new FaithfulKey(); + + faithfulkey.setEruptionDuration(eruptionDuration); + faithfulkey.setWaitingTime(waitingTime); + + this.outputPort.emit(faithfulkey); + } + + private void emitElapsedTime(int eT) + { + this.elapsedTime.emit(eT); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java ---------------------------------------------------------------------- diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java new file mode 100755 index 0000000..bd51c29 --- /dev/null +++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.r.oldfaithful; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * The application attempts to simulate 'Old Faithful Geyser" eruption. + * This application accepts readings for the waiting time and the subsequent eruption duration + * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next + * eruption given the elapsed time since the last eruption. + * The training data is generated for an application window and consists of multiple + * waiting times and eruption duration values. + * For every application window, it generates only one 'elapsed time' input for which the + * prediction would be made. + * Model in R is in file ruptionModel.R located at + * examples/r/src/main/resources/com/datatorrent/examples/oldfaithful/ directory + * + * @since 2.1.0 + */ + +@ApplicationAnnotation(name = "OldFaithfulApplication") +public class OldFaithfulApplication implements StreamingApplication +{ + private final DAG.Locality locality = null; + + /** + * Create the DAG + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + InputGenerator randomInputGenerator = dag.addOperator("rand", new InputGenerator()); + FaithfulRScript rScriptOp = dag.addOperator("rScriptOp", new FaithfulRScript("com/datatorrent/examples/r/oldfaithful/eruptionModel.R", "eruptionModel", "retVal")); + ConsoleOutputOperator consoles = dag.addOperator("consoles", new ConsoleOutputOperator()); + + Map<String, FaithfulRScript.REXP_TYPE> argTypeMap = new HashMap<String, FaithfulRScript.REXP_TYPE>(); + + argTypeMap.put("ELAPSEDTIME", FaithfulRScript.REXP_TYPE.REXP_INT); + argTypeMap.put("ERUPTIONS", FaithfulRScript.REXP_TYPE.REXP_ARRAY_DOUBLE); + argTypeMap.put("WAITING", FaithfulRScript.REXP_TYPE.REXP_ARRAY_INT); + + rScriptOp.setArgTypeMap(argTypeMap); + + dag.addStream("ingen_faithfulRscript", randomInputGenerator.outputPort, rScriptOp.faithfulInput).setLocality(locality); + dag.addStream("ingen_faithfulRscript_eT", randomInputGenerator.elapsedTime, rScriptOp.inputElapsedTime).setLocality(locality); + dag.addStream("faithfulRscript_console_s", rScriptOp.strOutput, consoles.input).setLocality(locality); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/r/src/main/resources/META-INF/properties.xml b/examples/r/src/main/resources/META-INF/properties.xml new file mode 100755 index 0000000..07c1e87 --- /dev/null +++ b/examples/r/src/main/resources/META-INF/properties.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> +<!--properties for R example --> + <property> + <name>dt.application.OldFaithfulApplication.class</name> + <value>org.apache.apex.examples.r.oldfaithful.OldFaithfulApplication</value> + <description>An alias for OldFaithful application</description> + </property> + + <property> + <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name> + <value>1024</value> + </property> + +<!-- Need this to information for loading native libraries --> + <property> + <name>dt.attr.CONTAINER_JVM_OPTIONS</name> + <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value> + </property> + +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R ---------------------------------------------------------------------- diff --git a/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R b/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R new file mode 100755 index 0000000..e46fa8d --- /dev/null +++ b/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R @@ -0,0 +1,60 @@ +#!/usr/bin/Rscript +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# This script apply the simple linear regression model for the data set 'faithful', +# and estimates the next eruption duration given the waiting time since the last eruption. +# + + eruptionModel <- function() { + + datavar = data.frame(ERUPTIONS, WAITING) + + #attach data variable + attach(datavar) + + #create a linear model using lm(FORMULA, DATAVAR) + #predict the fall eruption duration (ERUPT) using the waiting time since the last eruption (WAITING) + eruption.lm <- lm(ERUPTIONS ~ WAITING, datavar) + + #display linear model + eruption.lm + + # Get the values of the intercept and unemployment so as to be able to predict the enrolment + interc<-eruption.lm$coeff[["(Intercept)"]] + eruptionDuration<-eruption.lm$coeff[["WAITING"]] + + # Calculate the enrollment based on the percentage being asked for, and the model that has been rated above. + nextEruptionDuration<-(interc+(eruptionDuration * ELAPSEDTIME)) + +retVal<-paste("nextEruptionDuration ", nextEruptionDuration, sep=": ") +#retVal<-c("interc : ",interc, ", eruptionDuration : ", eruptionDuration,", nextEruptionDuration : ", nextEruptionDuration) + +sort( sapply(mget(ls()),object.size) ) + +detach(datavar); + +# Clear all the data from R workspace +rm(datavar); +rm(ERUPTIONS); +rm(WAITING); + +return(retVal) +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java b/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java new file mode 100755 index 0000000..0ebe958 --- /dev/null +++ b/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.r.oldfaithful; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +public class OldFaithfulApplicationTest +{ + + private static final Logger LOG = LoggerFactory.getLogger(OldFaithfulApplicationTest.class); + + @Test + public void testSomeMethod() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + OldFaithfulApplication app = new OldFaithfulApplication(); + app.populateDAG(lma.getDAG(), new Configuration(false)); + + try { + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + lc.run(5000); + } catch (Exception e) { + LOG.error("Exception: ", e); + Assert.fail("Unexpected exception."); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/test/resources/dt-site-oldfaithful.xml ---------------------------------------------------------------------- diff --git a/examples/r/src/test/resources/dt-site-oldfaithful.xml b/examples/r/src/test/resources/dt-site-oldfaithful.xml new file mode 100755 index 0000000..07c1e87 --- /dev/null +++ b/examples/r/src/test/resources/dt-site-oldfaithful.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> +<!--properties for R example --> + <property> + <name>dt.application.OldFaithfulApplication.class</name> + <value>org.apache.apex.examples.r.oldfaithful.OldFaithfulApplication</value> + <description>An alias for OldFaithful application</description> + </property> + + <property> + <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name> + <value>1024</value> + </property> + +<!-- Need this to information for loading native libraries --> + <property> + <name>dt.attr.CONTAINER_JVM_OPTIONS</name> + <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value> + </property> + +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/r/src/test/resources/log4j.properties b/examples/r/src/test/resources/log4j.properties new file mode 100755 index 0000000..cf0d19e --- /dev/null +++ b/examples/r/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/pom.xml ---------------------------------------------------------------------- diff --git a/examples/sql/pom.xml b/examples/sql/pom.xml new file mode 100644 index 0000000..7eb0f4d --- /dev/null +++ b/examples/sql/pom.xml @@ -0,0 +1,102 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-sql</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar SQL API Example</name> + <description>Apex example applications that use SQL APIs to construct a DAG</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.core.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-sql</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- For KafkaTest --> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-kafka</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.9.0.0</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/sql/src/assemble/appPackage.xml b/examples/sql/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/sql/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java new file mode 100644 index 0000000..80b997d --- /dev/null +++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.util.Date; +import java.util.Map; + +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.apex.malhar.sql.table.StreamEndpoint; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.ImmutableMap; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.contrib.parser.CsvParser; + + +@ApplicationAnnotation(name = "FusionStyleSQLApplication") +/** + * @since 3.6.0 + */ +public class FusionStyleSQLApplication implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SQLExecEnvironment env = SQLExecEnvironment.getEnvironment(); + env.registerFunction("APEXCONCAT", PureStyleSQLApplication.class, "apex_concat_str"); + + Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of( + "RowTime", Date.class, + "id", Integer.class, + "Product", String.class, + "units", Integer.class); + + // Add Kafka Input + KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class); + kafkaInput.setInitialOffset("EARLIEST"); + + // Add CSVParser + CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class); + dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in); + + // Register CSV Parser output as input table for first SQL + env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(csvParser.out, fieldMapping)); + + // Register FileEndpoint as output table for second SQL. + env.registerTable(conf.get("sqlSchemaOutputName"), new FileEndpoint(conf.get("folderPath"), + conf.get("fileName"), new CSVMessageFormat(conf.get("sqlSchemaOutputDef")))); + + // Add second SQL to DAG + env.executeSQL(dag, conf.get("sql")); + } + + public static class PassThroughOperator extends BaseOperator + { + public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultInputPort input = new DefaultInputPort() + { + @Override + public void process(Object o) + { + output.emit(output); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java new file mode 100644 index 0000000..79295f9 --- /dev/null +++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.apex.malhar.sql.table.KafkaEndpoint; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "PureStyleSQLApplication") +/** + * @since 3.6.0 + */ +public class PureStyleSQLApplication implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Source definition + String schemaInName = conf.get("schemaInName"); + String schemaInDef = conf.get("schemaInDef"); + String broker = conf.get("broker"); + String sourceTopic = conf.get("topic"); + + // Destination definition + String schemaOutName = conf.get("schemaOutName"); + String schemaOutDef = conf.get("schemaOutDef"); + String outputFolder = conf.get("outputFolder"); + String outFilename = conf.get("destFileName"); + + // SQL statement + String sql = conf.get("sql"); + + SQLExecEnvironment.getEnvironment() + .registerTable(schemaInName, new KafkaEndpoint(broker, sourceTopic, + new CSVMessageFormat(schemaInDef))) + .registerTable(schemaOutName, new FileEndpoint(outputFolder, outFilename, + new CSVMessageFormat(schemaOutDef))) + .registerFunction("APEXCONCAT", this.getClass(), "apex_concat_str") + .executeSQL(dag, sql); + } + + public static String apex_concat_str(String s1, String s2) + { + return s1 + s2; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java new file mode 100644 index 0000000..da4f563 --- /dev/null +++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "SQLApplicationWithAPI") +/** + * @since 3.6.0 + */ +public class SQLApplicationWithAPI implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Source definition + String schemaInName = conf.get("csvSchemaInName"); + String schemaIn = conf.get("csvSchemaIn"); + String sourceFile = conf.get("sourceFile"); + + SQLExecEnvironment.getEnvironment() + .registerTable(schemaInName, new FileEndpoint(sourceFile, new CSVMessageFormat(schemaIn))) + .executeSQL(dag, conf.get("sql")); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java new file mode 100644 index 0000000..4c90a82 --- /dev/null +++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.File; +import java.io.IOException; + +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "SQLApplicationWithModelFile") +/** + * @since 3.6.0 + */ +public class SQLApplicationWithModelFile implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String modelFile = conf.get("modelFile"); + String model; + try { + model = FileUtils.readFileToString(new File(modelFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + SQLExecEnvironment.getEnvironment() + .withModel(model) + .executeSQL(dag, conf.get("sql")); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml b/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml new file mode 100644 index 0000000..77852e7 --- /dev/null +++ b/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Kafka Operator Properties --> + <property> + <name>dt.operator.KafkaInput.prop.topics</name> + <value>dataTopic</value> + </property> + <property> + <name>dt.operator.KafkaInput.prop.clusters</name> + <value>localhost:9092</value> <!-- broker (NOT zookeeper) address --> + </property> + + <!-- CSV Parser Properties --> + <property> + <name>dt.operator.CSVParser.prop.schema</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> + </property> + + <!-- SQL Properties --> + <property> + <name>sqlSchemaInputName</name> + <value>FROMCSV</value> + </property> + <property> + <name>sqlSchemaOutputName</name> + <value>TOFILE</value> + </property> + <property> + <name>folderPath</name> + <value>/tmp/output</value> + </property> + <property> + <name>fileName</name> + <value>output.txt</value> + </property> + <property> + <name>sqlSchemaOutputDef</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value> + </property> + <property> + <name>sql</name> + <value>INSERT INTO TOFILE SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM FROMCSV WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml b/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml new file mode 100644 index 0000000..0d25aa6 --- /dev/null +++ b/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Input Definition --> + <property> + <name>schemaInName</name> + <value>ORDERS</value> + </property> + <property> + <name>schemaInDef</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> + </property> + <property> + <name>broker</name> + <value>localhost:9090</value> + </property> + <property> + <name>topic</name> + <value>inputTopic</value> + </property> + + <!-- Output Definition --> + <property> + <name>schemaOutName</name> + <value>SALES</value> + </property> + <property> + <name>schemaOutDef</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value> + </property> + <property> + <name>outputFolder</name> + <value>/tmp/output</value> + </property> + <property> + <name>destFileName</name> + <value>out.file</value> + </property> + + <!-- Execution SQL --> + <property> + <name>sql</name> + <value>INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml new file mode 100644 index 0000000..9ac49d4 --- /dev/null +++ b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml @@ -0,0 +1,43 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Input Definition --> + <property> + <name>csvSchemaInName</name> + <value>ORDERS</value> + </property> + <property> + <name>csvSchemaIn</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> + </property> + <property> + <name>sourceFile</name> + <value>src/test/resources/input.csv</value> + </property> + + <!-- Execution SQL --> + <property> + <name>sql</name> + <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml new file mode 100644 index 0000000..ab026c2 --- /dev/null +++ b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml @@ -0,0 +1,32 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>modelFile</name> + <value>src/main/resources/model/model_file_csv.json</value> + </property> + <property> + <name>sql</name> + <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/resources/META-INF/properties.xml b/examples/sql/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..2702315 --- /dev/null +++ b/examples/sql/src/main/resources/META-INF/properties.xml @@ -0,0 +1,41 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Memory settings for all examples --> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>512</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> + <value>-Xmx128M</value> + </property> + <property> + <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> + <value>128</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/model/model_file_csv.json ---------------------------------------------------------------------- diff --git a/examples/sql/src/main/resources/model/model_file_csv.json b/examples/sql/src/main/resources/model/model_file_csv.json new file mode 100644 index 0000000..beba18d --- /dev/null +++ b/examples/sql/src/main/resources/model/model_file_csv.json @@ -0,0 +1,27 @@ +{ + "version": "1.0", + "defaultSchema": "APEX", + "schemas": [{ + "name": "APEX", + "tables": [ + { + "name": "ORDERS", + "type": "custom", + "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory", + "stream": { + "stream": true + }, + "operand": { + "endpoint": "file", + "messageFormat": "csv", + "endpointOperands": { + "directory": "src/test/resources/input.csv" + }, + "messageFormatOperands": { + "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}" + } + } + } + ] + }] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java new file mode 100644 index 0000000..7208701 --- /dev/null +++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import org.apache.apex.malhar.kafka.EmbeddedKafka; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +public class FusionStyleSQLApplicationTest +{ + private final String testTopicData = "dataTopic"; + private final String testTopicResult = "resultTopic"; + + private TimeZone defaultTZ; + private EmbeddedKafka kafka; + + private static String outputFolder = "target/output/"; + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + + kafka = new EmbeddedKafka(); + kafka.start(); + kafka.createTopic(testTopicData); + kafka.createTopic(testTopicResult); + + outputFolder += testName.getMethodName() + "/"; + } + + @After + public void tearDown() throws Exception + { + kafka.stop(); + + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-FusionStyleSQLApplication.xml")); + + conf.set("dt.operator.KafkaInput.prop.topics", testTopicData); + conf.set("dt.operator.KafkaInput.prop.clusters", kafka.getBroker()); + conf.set("folderPath", outputFolder); + conf.set("fileName", "out.tmp"); + + FusionStyleSQLApplication app = new FusionStyleSQLApplication(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + lc.runAsync(); + kafka.publish(testTopicData, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + Assert.assertTrue(PureStyleSQLApplicationTest.waitTillFileIsPopulated(outputFolder, 40000)); + lc.shutdown(); + + File file = new File(outputFolder); + File file1 = new File(outputFolder + file.list()[0]); + List<String> strings = FileUtils.readLines(file1); + + String[] actualLines = strings.toArray(new String[strings.size()]); + String[] expectedLines = new String[] { + "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4", + "", + "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5", + ""}; + Assert.assertEquals(expectedLines.length, actualLines.length); + for (int i = 0; i < actualLines.length; i++) { + Assert.assertEquals(expectedLines[i], actualLines[i]); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java new file mode 100644 index 0000000..f298059 --- /dev/null +++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import org.apache.apex.malhar.kafka.EmbeddedKafka; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.LocalMode; + + +public class PureStyleSQLApplicationTest +{ + private final String testTopicData = "dataTopic"; + private final String testTopicResult = "resultTopic"; + + private TimeZone defaultTZ; + private EmbeddedKafka kafka; + private static String outputFolder = "target/output/"; + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + + kafka = new EmbeddedKafka(); + kafka.start(); + kafka.createTopic(testTopicData); + kafka.createTopic(testTopicResult); + + outputFolder += testName.getMethodName() + "/"; + } + + @After + public void tearDown() throws Exception + { + kafka.stop(); + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-PureStyleSQLApplication.xml")); + + conf.set("broker", kafka.getBroker()); + conf.set("topic", testTopicData); + conf.set("outputFolder", outputFolder); + conf.set("destFileName", "out.tmp"); + + PureStyleSQLApplication app = new PureStyleSQLApplication(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + lc.runAsync(); + kafka.publish(testTopicData, Arrays.asList( + "15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", + "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", + "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000)); + lc.shutdown(); + + File file = new File(outputFolder); + File file1 = new File(outputFolder + file.list()[0]); + List<String> strings = FileUtils.readLines(file1); + + String[] actualLines = strings.toArray(new String[strings.size()]); + + String[] expectedLines = new String[]{ + "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4", + "", + "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5", + ""}; + + Assert.assertEquals(expectedLines.length, actualLines.length); + for (int i = 0;i < expectedLines.length; i++) { + Assert.assertEquals(expectedLines[i], actualLines[i]); + } + } + + public static boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException + { + boolean result; + long now = System.currentTimeMillis(); + Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath()); + try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) { + List<String> strings = Lists.newArrayList(); + while (System.currentTimeMillis() - now < timeout) { + if (fs.exists(outDir)) { + File file = new File(outputFolder); + if (file.list().length > 0) { + File file1 = new File(outputFolder + file.list()[0]); + strings = FileUtils.readLines(file1); + if (strings.size() != 0) { + break; + } + } + } + + Thread.sleep(500); + } + + result = fs.exists(outDir) && (strings.size() != 0); + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java new file mode 100644 index 0000000..6b1a404 --- /dev/null +++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; + +import com.datatorrent.api.LocalMode; + + +public class SQLApplicationWithAPITest +{ + private TimeZone defaultTZ; + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + } + + @After + public void tearDown() throws Exception + { + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithAPI.xml")); + + SQLApplicationWithAPI app = new SQLApplicationWithAPI(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + PrintStream originalSysout = System.out; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + + lc.runAsync(); + SQLApplicationWithModelFileTest.waitTillStdoutIsPopulated(baos, 30000); + lc.shutdown(); + + System.setOut(originalSysout); + + String[] sout = baos.toString().split(System.lineSeparator()); + Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + + String[] actualLines = filter.toArray(new String[filter.size()]); + Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1")); + Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2")); + Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3")); + Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4")); + Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5")); + Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6")); + } +}
