JdbcInput and HDFS output example app SPOI-8251 jdbc to jdbc app
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/eae0eeee Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/eae0eeee Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/eae0eeee Branch: refs/heads/master Commit: eae0eeee5e88000c0fabf5aedf3078e33f863cd9 Parents: a1b7155 Author: devtagare <[email protected]> Authored: Thu Apr 13 15:59:03 2017 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Wed Jun 7 10:33:00 2017 -0700 ---------------------------------------------------------------------- examples/jdbcIngest/.gitignore | 1 + examples/jdbcIngest/README.md | 65 ++++ .../jdbcIngest/XmlJavadocCommentsExtractor.xsl | 44 +++ examples/jdbcIngest/pom.xml | 298 +++++++++++++++++ examples/jdbcIngest/src/assemble/appPackage.xml | 43 +++ .../example/mydtapp/FileLineOutputOperator.java | 36 +++ .../java/com/example/mydtapp/JdbcHDFSApp.java | 75 +++++ .../example/mydtapp/JdbcPollerApplication.java | 48 +++ .../java/com/example/mydtapp/PojoEvent.java | 44 +++ .../META-INF/properties-PollJdbcToHDFSApp.xml | 73 +++++ .../META-INF/properties-SimpleJdbcToHDFSApp.xml | 66 ++++ .../com/example/mydtapp/ApplicationTest.java | 56 ++++ .../com/example/mydtapp/JdbcInputAppTest.java | 137 ++++++++ .../mydtapp/JdbcPollerApplicationTest.java | 128 ++++++++ .../jdbcIngest/src/test/resources/example.sql | 24 ++ .../src/test/resources/log4j.properties | 21 ++ examples/jdbcToJdbc/.gitignore | 1 + examples/jdbcToJdbc/README.md | 55 ++++ .../jdbcToJdbc/XmlJavadocCommentsExtractor.xsl | 44 +++ examples/jdbcToJdbc/pom.xml | 319 +++++++++++++++++++ examples/jdbcToJdbc/src/assemble/appPackage.xml | 43 +++ .../java/com/example/mydtapp/JdbcToJdbcApp.java | 101 ++++++ .../java/com/example/mydtapp/PojoEvent.java | 44 +++ .../src/main/resources/META-INF/properties.xml | 88 +++++ .../com/example/mydtapp/ApplicationTest.java | 42 +++ .../com/example/mydtapp/JdbcOperatorTest.java | 155 +++++++++ .../jdbcToJdbc/src/test/resources/example.sql | 36 +++ .../src/test/resources/log4j.properties | 21 ++ 28 files changed, 2108 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/.gitignore ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/.gitignore b/examples/jdbcIngest/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/examples/jdbcIngest/.gitignore @@ -0,0 +1 @@ +/target/ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/README.md ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/README.md b/examples/jdbcIngest/README.md new file mode 100644 index 0000000..ec01985 --- /dev/null +++ b/examples/jdbcIngest/README.md @@ -0,0 +1,65 @@ +## Sample mysql implementation + +This project contains two applications to read records from a table in `MySQL`, create POJOs and write them to a file +in the user specified directory in HDFS. + +1. SimpleJdbcToHDFSApp: Reads table records as per given query and emits them as POJOs. +2. PollJdbcToHDFSApp: Reads table records using partitions in parallel fashion also polls for newly **appended** records and emits them as POJOs. + +Follow these steps to run these applications: + +**Step 1**: Update these properties in the file `src/main/resources/META_INF/properties-<applicationName>.xml`: + +| Property Name | Description | +| ------------- | ----------- | +| dt.application.<applicationName>.operator.JdbcInput.prop.store.databaseUrl | database URL of the form `jdbc:mysql://hostName:portNumber/dbName` | +| dt.application.<applicationName>.operator.JdbcInput.prop.store.userName | MySQL user name | +| dt.application.<applicationName>.operator.JdbcInput.prop.store.password | MySQL user password | +| dt.application.<applicationName>.operator.FileOutputOperator.filePath | HDFS output directory path | + +**Step 2**: Create database table and add entries + +Go to the MySQL console and run (where _{path}_ is a suitable prefix): + + mysql> source {path}/src/test/resources/example.sql + +After this, please verify that `testDev.test_event_table` is created and has 10 rows: + + mysql> select count(*) from testDev.test_event_table; + +----------+ + | count(*) | + +----------+ + | 10 | + +----------+ + +**Step 3**: Create HDFS output directory if not already present (_{path}_ should be the same as specified in `META_INF/properties-<applicationName>.xml`): + + hadoop fs -mkdir -p {path} + +**Step 4**: Build the code: + + shell> mvn clean install + +Upload the `target/jdbcInput-1.0-SNAPSHOT.apa` to the UI console if available or launch it from +the commandline using `apexcli`. + +**Step 5**: During launch use `src/main/resources/META_INF/properties-<applicationName>.xml` as a custom configuration file; then verify +that the output directory has the expected output: + + shell> hadoop fs -cat <hadoop directory path>/2_op.dat.* | wc -l + +This should return 10 as the count. + +Sample Output: + + hadoop fs -cat <path_to_file>/2_op.dat.0 + PojoEvent [accountNumber=1, name=User1, amount=1000] + PojoEvent [accountNumber=2, name=User2, amount=2000] + PojoEvent [accountNumber=3, name=User3, amount=3000] + PojoEvent [accountNumber=4, name=User4, amount=4000] + PojoEvent [accountNumber=5, name=User5, amount=5000] + PojoEvent [accountNumber=6, name=User6, amount=6000] + PojoEvent [accountNumber=7, name=User7, amount=7000] + PojoEvent [accountNumber=8, name=User8, amount=8000] + PojoEvent [accountNumber=9, name=User9, amount=9000] + PojoEvent [accountNumber=10, name=User10, amount=1000] http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl b/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl new file mode 100644 index 0000000..08075a9 --- /dev/null +++ b/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> + +<!-- + Document : XmlJavadocCommentsExtractor.xsl + Created on : September 16, 2014, 11:30 AM + Description: + The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. +--> + +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> + <xsl:output method="xml" standalone="yes"/> + + <!-- copy xml by selecting only the following nodes, attributes and text --> + <xsl:template match="node()|text()|@*"> + <xsl:copy> + <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> + </xsl:copy> + </xsl:template> + + <!-- Strip off the following paths from the selected xml --> + <xsl:template match="//root/package/interface/interface + |//root/package/interface/method/@qualified + |//root/package/class/interface + |//root/package/class/class + |//root/package/class/method/@qualified + |//root/package/class/field/@qualified" /> + + <xsl:strip-space elements="*"/> +</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/pom.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/pom.xml b/examples/jdbcIngest/pom.xml new file mode 100644 index 0000000..f9288b8 --- /dev/null +++ b/examples/jdbcIngest/pom.xml @@ -0,0 +1,298 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.example</groupId> + <version>1.0-SNAPSHOT</version> + <artifactId>jdbcInput</artifactId> + <packaging>jar</packaging> + + <!-- change these to the appropriate values --> + <name>JDBC Input Operator</name> + <description>Example Uses of JDBC Input Operator</description> + + <properties> + <!-- change this if you desire to use a different version of Apex Core --> + <apex.version>3.5.0</apex.version> + <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> + <malhar.version>3.6.0</malhar.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.9</version> + <configuration> + <downloadSources>true</downloadSources> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <encoding>UTF-8</encoding> + <source>1.7</source> + <target>1.7</target> + <debug>true</debug> + <optimize>false</optimize> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>target/deps</outputDirectory> + <includeScope>runtime</includeScope> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>app-package-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-${project.version}-apexapp</finalName> + <appendAssemblyId>false</appendAssemblyId> + <descriptors> + <descriptor>src/assemble/appPackage.xml</descriptor> + </descriptors> + <archiverConfig> + <defaultDirectoryMode>0755</defaultDirectoryMode> + </archiverConfig> + <archive> + <manifestEntries> + <Class-Path>${apex.apppackage.classpath}</Class-Path> + <DT-Engine-Version>${apex.version}</DT-Engine-Version> + <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> + <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> + <DT-App-Package-Version>${project.version}</DT-App-Package-Version> + <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> + <DT-App-Package-Description>${project.description}</DT-App-Package-Description> + </manifestEntries> + </archive> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <phase>package</phase> + <configuration> + <target> + <move + file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" + tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + <execution> + <!-- create resource directory for xml javadoc --> + <id>createJavadocDirectory</id> + <phase>generate-resources</phase> + <configuration> + <tasks> + <delete + dir="${project.build.directory}/generated-resources/xml-javadoc" /> + <mkdir + dir="${project.build.directory}/generated-resources/xml-javadoc" /> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + <!-- generate javdoc --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <!-- generate xml javadoc --> + <execution> + <id>xml-doclet</id> + <phase>generate-resources</phase> + <goals> + <goal>javadoc</goal> + </goals> + <configuration> + <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> + <additionalparam>-d + ${project.build.directory}/generated-resources/xml-javadoc + -filename + ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> + <useStandardDocletOptions>false</useStandardDocletOptions> + <docletArtifact> + <groupId>com.github.markusbernhardt</groupId> + <artifactId>xml-doclet</artifactId> + <version>1.0.4</version> + </docletArtifact> + </configuration> + </execution> + </executions> + </plugin> + <!-- Transform xml javadoc to stripped down version containing only + class/interface comments and tags --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>xml-maven-plugin</artifactId> + <version>1.0</version> + <executions> + <execution> + <id>transform-xmljavadoc</id> + <phase>generate-resources</phase> + <goals> + <goal>transform</goal> + </goals> + </execution> + </executions> + <configuration> + <transformationSets> + <transformationSet> + <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> + <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> + </transformationSet> + </transformationSets> + </configuration> + </plugin> + <!-- copy xml javadoc to class jar --> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>copy-resources</id> + <phase>process-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/classes</outputDirectory> + <resources> + <resource> + <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + + </build> + + <dependencies> + <!-- add your dependencies here --> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-library</artifactId> + <version>${malhar.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>5.1.36</version> + </dependency> + <dependency> + <groupId>org.jooq</groupId> + <artifactId>jooq</artifactId> + <version>3.6.4</version> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + </dependency> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/assemble/appPackage.xml b/examples/jdbcIngest/src/assemble/appPackage.xml new file mode 100644 index 0000000..7ad071c --- /dev/null +++ b/examples/jdbcIngest/src/assemble/appPackage.xml @@ -0,0 +1,43 @@ +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/resources</directory> + <outputDirectory>/resources</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java new file mode 100644 index 0000000..e155f23 --- /dev/null +++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.example.mydtapp; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +public class FileLineOutputOperator extends AbstractFileOutputOperator<Object> +{ + @Override + protected String getFileName(Object input) + { + return context.getId() + "_" + "op.dat"; + } + + @Override + protected byte[] getBytesForTuple(Object input) + { + return (input.toString() + "\n").getBytes(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java new file mode 100644 index 0000000..5605bcf --- /dev/null +++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.example.mydtapp; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; + +@ApplicationAnnotation(name = "SimpleJdbcToHDFSApp") +public class JdbcHDFSApp implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator()); + /** + * The class given below can be updated to the user defined class based on + * input table schema The addField infos method needs to be updated + * accordingly This line can be commented and class can be set from the + * properties file + */ + // dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class); + + jdbcInputOperator.setFieldInfos(addFieldInfos()); + + JdbcStore store = new JdbcStore(); + jdbcInputOperator.setStore(store); + + FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator()); + + dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL); + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<FieldInfo> addFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); + fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); + return fieldInfos; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java new file mode 100644 index 0000000..54d71f7 --- /dev/null +++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java @@ -0,0 +1,48 @@ +package com.example.mydtapp; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcPOJOPollInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; +import com.google.common.collect.Lists; + +@ApplicationAnnotation(name = "PollJdbcToHDFSApp") +public class JdbcPollerApplication implements StreamingApplication +{ + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOPollInputOperator poller = dag.addOperator("JdbcPoller", new JdbcPOJOPollInputOperator()); + + JdbcStore store = new JdbcStore(); + poller.setStore(store); + + poller.setFieldInfos(addFieldInfos()); + + FileLineOutputOperator writer = dag.addOperator("Writer", new FileLineOutputOperator()); + dag.setInputPortAttribute(writer.input, PortContext.PARTITION_PARALLEL, true); + writer.setRotationWindows(60); + + dag.addStream("dbrecords", poller.outputPort, writer.input); + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<FieldInfo> addFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); + fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); + return fieldInfos; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java new file mode 100644 index 0000000..f56522b --- /dev/null +++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java @@ -0,0 +1,44 @@ +package com.example.mydtapp; + +public class PojoEvent +{ + @Override + public String toString() + { + return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; + } + + private int accountNumber; + private String name; + private int amount; + + public int getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(int accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAmount() + { + return amount; + } + + public void setAmount(int amount) + { + this.amount = amount; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml new file mode 100644 index 0000000..6e7aaf6 --- /dev/null +++ b/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml @@ -0,0 +1,73 @@ +<?xml version="1.0"?> +<configuration> + <!-- Static partitioning, specify the partition count, this decides how + many ranges would be initiated --> + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount</name> + <value>2</value> + </property> + + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver</name> + <value>com.mysql.jdbc.Driver</value> + </property> + + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl</name> + <value>jdbc:mysql://localhost:3306/testDev</value> + </property> + + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.userName</name> + <value>root</value> + </property> + + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.password</name> + <value>mysql</value> + </property> + + <!-- Batch size for poller --> + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.batchSize</name> + <value>300</value> + </property> + + <!-- look-up key for forming range queries, this would be the column name + on which the table is sorted --> + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key</name> + <value>ACCOUNT_NO</value> + </property> + + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression</name> + <value>ACCOUNT_NO,NAME,AMOUNT</value> + </property> + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name> + <value>com.example.mydtapp.PojoEvent</value> + </property> + + <!-- Table name --> + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName</name> + <value>test_event_table</value> + </property> + + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.pollInterval</name> + <value>1000</value> + </property> + + <!-- Output folder for HDFS output operator --> + <property> + <name>dt.application.PollJdbcToHDFSApp.operator.Writer.filePath</name> + <value>/tmp/test/output</value> + </property> + + <property> + <name>dt.loggers.level</name> + <value>com.datatorrent.*:DEBUG,org.apache.*:INFO</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml new file mode 100644 index 0000000..9fce7f8 --- /dev/null +++ b/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml @@ -0,0 +1,66 @@ +<?xml version="1.0"?> +<configuration> + <!-- <property> <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from + the user or custom config when launching)</value> </property> --> + <!-- memory assigned to app master <property> <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> </property> --> + + <!-- JDBC driver in use --> + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseDriver + </name> + <value>org.hsqldb.jdbcDriver</value> + </property> + + <!-- URL to connect to the DB master --> + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseUrl + </name> + <value>jdbc:hsqldb:mem:test</value> + </property> + + <!-- # rows that the operator can retrieve in a window --> + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.fetchSize + </name> + <value>50</value> + </property> + + <!-- Query to fetch data --> + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.query + </name> + <value>select * from test_event_table + </value> + </property> + + <!-- Table name --> + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.tableName + </name> + <value>test_event_table</value> + </property> + + <!-- POJO class --> + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS + </name> + <value>com.example.mydtapp.PojoEvent</value> + </property> + + <!-- Output folder for HDFS output operator --> + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.filePath + </name> + <value>/tmp/jdbcApp</value> + </property> + + <property> + <name>dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.rotationWindows + </name> + <value>5</value> + </property> + +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java new file mode 100644 index 0000000..fb78944 --- /dev/null +++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.example.mydtapp; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode.<br> + * The assumption to run this test case is that test_event_table is created + * already + */ +public class ApplicationTest +{ + + @Test + @Ignore + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml")); + lma.prepareDAG(new JdbcHDFSApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java new file mode 100644 index 0000000..1d95f4d --- /dev/null +++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.example.mydtapp; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Application test for {@link JdbcHDFSApp} + */ +public class JdbcInputAppTest +{ + private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + private static final String TABLE_NAME = "test_event_table"; + private static final String FILE_NAME = "/tmp/jdbcApp"; + + @BeforeClass + public static void setup() + { + try { + cleanup(); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + cleanTable(); + insertEventsInTable(10, 0); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void cleanup() + { + try { + FileUtils.deleteDirectory(new File(FILE_NAME)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void cleanTable() + { + try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml")); + lma.prepareDAG(new JdbcHDFSApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to roll + Thread.sleep(5000); + + String[] extensions = { "dat.0", "tmp" }; + Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false); + Assert.assertEquals("Records in file", 10, FileUtils.readLines(list.iterator().next()).size()); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java new file mode 100644 index 0000000..b96d4ae --- /dev/null +++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java @@ -0,0 +1,128 @@ +package com.example.mydtapp; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datatorrent.api.LocalMode; + +public class JdbcPollerApplicationTest +{ + private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + private static final String TABLE_NAME = "test_event_table"; + private static final String OUTPUT_DIR_NAME = "/tmp/test/output"; + + @BeforeClass + public static void setup() + { + try { + cleanup(); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + cleanTable(); + insertEventsInTable(10, 0); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void cleanup() + { + try { + FileUtils.deleteDirectory(new File(OUTPUT_DIR_NAME)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void cleanTable() + { + try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl", URL); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver", DB_DRIVER); + conf.setInt("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount", 2); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key", "ACCOUNT_NO"); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression", "ACCOUNT_NO,NAME,AMOUNT"); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName", TABLE_NAME); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS", + "com.example.mydtapp.PojoEvent"); + conf.set("dt.application.PollJdbcToHDFSApp.operator.Writer.filePath", OUTPUT_DIR_NAME); + + lma.prepareDAG(new JdbcPollerApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to roll + Thread.sleep(5000); + + String[] extensions = { "dat.0", "tmp" }; + Collection<File> list = FileUtils.listFiles(new File(OUTPUT_DIR_NAME), extensions, false); + int recordsCount = 0; + for (File file : list) { + recordsCount += FileUtils.readLines(file).size(); + } + Assert.assertEquals("Records in file", 10, recordsCount); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/test/resources/example.sql ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/resources/example.sql b/examples/jdbcIngest/src/test/resources/example.sql new file mode 100644 index 0000000..531c659 --- /dev/null +++ b/examples/jdbcIngest/src/test/resources/example.sql @@ -0,0 +1,24 @@ +DROP DATABASE IF EXISTS testDev; + +CREATE DATABASE testDev; + +USE testDev; + +CREATE TABLE IF NOT EXISTS `test_event_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255) DEFAULT NULL, + `AMOUNT` int(11) DEFAULT NULL, + primary key(`ACCOUNT_NO`) +) ENGINE=MyISAM DEFAULT CHARSET=latin1; + +INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES +(1, 'User1', 1000), +(2, 'User2', 2000), +(3, 'User3', 3000), +(4, 'User4', 4000), +(5, 'User5', 5000), +(6, 'User6', 6000), +(7, 'User7', 7000), +(8, 'User8', 8000), +(9, 'User9', 9000), +(10, 'User10', 1000); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcIngest/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/resources/log4j.properties b/examples/jdbcIngest/src/test/resources/log4j.properties new file mode 100644 index 0000000..3bfcdc5 --- /dev/null +++ b/examples/jdbcIngest/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/.gitignore ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/.gitignore b/examples/jdbcToJdbc/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/examples/jdbcToJdbc/.gitignore @@ -0,0 +1 @@ +/target/ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/README.md ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/README.md b/examples/jdbcToJdbc/README.md new file mode 100644 index 0000000..562de69 --- /dev/null +++ b/examples/jdbcToJdbc/README.md @@ -0,0 +1,55 @@ +JdbcToJdbc App + +This application reads from a source table in MySQL, creates POJO's and writes the POJO's to another table in MySQL. + +Steps : + +Step 1 : Update the below properties in the properties file - src/site/conf/example.xml + +1.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseUrl +- data base URL of the form jdbc:mysql://hostName:portNumber/dbName +2.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.userName +- mysql user name +3.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.password +- password +4.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseUrl +- data base URL of the form jdbc:mysql://hostName:portNumber/dbName +5.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.userName +- mysql user name +6.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.password +- password + +Step 2: Create database, table and add entries + +Go to mysql console and run the below command, +mysql> source <path to > src/test/resources/example.sql + +After this is done, please verify that testDev.test_event_table is created and has 10 rows.It will also create an output table by the name testDev.test_output_event_table + +mysql> select count(*) from testDev.test_event_table; ++----------+ +| count(*) | ++----------+ +| 10 | ++----------+ + +Step 3: Build the code, +shell> mvn clean install + +Upload the target/jdbcInput-1.0-SNAPSHOT.apa to the gateway + +Step 4 : During launch use "Specify custom properties" option and select example.xml + +Verification : + +Log on to the mysql console + +mysql> select count(*) from testDev.test_event_table; ++----------+ +| count(*) | ++----------+ +| 10 | ++----------+ + + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl b/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl new file mode 100644 index 0000000..08075a9 --- /dev/null +++ b/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> + +<!-- + Document : XmlJavadocCommentsExtractor.xsl + Created on : September 16, 2014, 11:30 AM + Description: + The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. +--> + +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> + <xsl:output method="xml" standalone="yes"/> + + <!-- copy xml by selecting only the following nodes, attributes and text --> + <xsl:template match="node()|text()|@*"> + <xsl:copy> + <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> + </xsl:copy> + </xsl:template> + + <!-- Strip off the following paths from the selected xml --> + <xsl:template match="//root/package/interface/interface + |//root/package/interface/method/@qualified + |//root/package/class/interface + |//root/package/class/class + |//root/package/class/method/@qualified + |//root/package/class/field/@qualified" /> + + <xsl:strip-space elements="*"/> +</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/pom.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/pom.xml b/examples/jdbcToJdbc/pom.xml new file mode 100644 index 0000000..8ed69d8 --- /dev/null +++ b/examples/jdbcToJdbc/pom.xml @@ -0,0 +1,319 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.example</groupId> + <version>1.0-SNAPSHOT</version> + <artifactId>jdbcToJdbc</artifactId> + <packaging>jar</packaging> + + <!-- change these to the appropriate values --> + <name>JDBC Input Operator</name> + <description>Example Use of JDBC Input Operator</description> + + <properties> + <!-- change this if you desire to use a different version of Apex Core --> + <apex.version>3.5.0</apex.version> + <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> + <malhar.version>3.6.0</malhar.version> + <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> + <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath> + </properties> + <repositories> + <repository> + <snapshots> + <enabled>false</enabled> + </snapshots> + <id>Datatorrent-Releases</id> + <name>DataTorrent Release Repository</name> + <url>https://www.datatorrent.com/maven/content/repositories/releases/</url> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.9</version> + <configuration> + <downloadSources>true</downloadSources> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <encoding>UTF-8</encoding> + <source>1.7</source> + <target>1.7</target> + <debug>true</debug> + <optimize>false</optimize> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>target/deps</outputDirectory> + <includeScope>runtime</includeScope> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>app-package-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-${project.version}-apexapp</finalName> + <appendAssemblyId>false</appendAssemblyId> + <descriptors> + <descriptor>src/assemble/appPackage.xml</descriptor> + </descriptors> + <archiverConfig> + <defaultDirectoryMode>0755</defaultDirectoryMode> + </archiverConfig> + <archive> + <manifestEntries> + <Class-Path>${apex.apppackage.classpath}</Class-Path> + <DT-Engine-Version>${apex.version}</DT-Engine-Version> + <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> + <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> + <DT-App-Package-Version>${project.version}</DT-App-Package-Version> + <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> + <DT-App-Package-Description>${project.description}</DT-App-Package-Description> + </manifestEntries> + </archive> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <phase>package</phase> + <configuration> + <target> + <move + file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" + tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + <execution> + <!-- create resource directory for xml javadoc --> + <id>createJavadocDirectory</id> + <phase>generate-resources</phase> + <configuration> + <tasks> + <delete + dir="${project.build.directory}/generated-resources/xml-javadoc" /> + <mkdir + dir="${project.build.directory}/generated-resources/xml-javadoc" /> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + <!-- generate javdoc --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <!-- generate xml javadoc --> + <execution> + <id>xml-doclet</id> + <phase>generate-resources</phase> + <goals> + <goal>javadoc</goal> + </goals> + <configuration> + <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> + <additionalparam>-d + ${project.build.directory}/generated-resources/xml-javadoc + -filename + ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> + <useStandardDocletOptions>false</useStandardDocletOptions> + <docletArtifact> + <groupId>com.github.markusbernhardt</groupId> + <artifactId>xml-doclet</artifactId> + <version>1.0.4</version> + </docletArtifact> + </configuration> + </execution> + </executions> + </plugin> + <!-- Transform xml javadoc to stripped down version containing only + class/interface comments and tags --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>xml-maven-plugin</artifactId> + <version>1.0</version> + <executions> + <execution> + <id>transform-xmljavadoc</id> + <phase>generate-resources</phase> + <goals> + <goal>transform</goal> + </goals> + </execution> + </executions> + <configuration> + <transformationSets> + <transformationSet> + <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> + <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> + </transformationSet> + </transformationSets> + </configuration> + </plugin> + <!-- copy xml javadoc to class jar --> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>copy-resources</id> + <phase>process-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/classes</outputDirectory> + <resources> + <resource> + <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + + </build> + + <dependencies> + <!-- add your dependencies here --> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-library</artifactId> + <version>${malhar.version}</version> + <!-- If you know that your application does not need transitive dependencies + pulled in by malhar-library, uncomment the following to reduce the size of + your app package. --> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>5.1.36</version> + </dependency> + + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + </dependency> + + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>2.7.8</version> + </dependency> + + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/assemble/appPackage.xml b/examples/jdbcToJdbc/src/assemble/appPackage.xml new file mode 100644 index 0000000..7ad071c --- /dev/null +++ b/examples/jdbcToJdbc/src/assemble/appPackage.xml @@ -0,0 +1,43 @@ +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/resources</directory> + <outputDirectory>/resources</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java new file mode 100644 index 0000000..6dffa87 --- /dev/null +++ b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.example.mydtapp; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; + +@ApplicationAnnotation(name = "JdbcToJdbcApp") +public class JdbcToJdbcApp implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator()); + JdbcStore store = new JdbcStore(); + jdbcInputOperator.setStore(store); + jdbcInputOperator.setFieldInfos(addFieldInfos()); + + /** + * The class given below can be updated to the user defined class based on + * input table schema The addField infos method needs to be updated + * accordingly This line can be commented and class can be set from the + * properties file + */ + //dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class); + + JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutputOperator.setStore(outputStore); + jdbcOutputOperator.setFieldInfos(addJdbcFieldInfos()); + + /** + * The class given below can be updated to the user defined class based on + * input table schema The addField infos method needs to be updated + * accordingly This line can be commented and class can be set from the + * properties file + */ + //dag.setInputPortAttribute(jdbcOutputOperator.input, Context.PortContext.TUPLE_CLASS, PojoEvent.class); + + dag.addStream("POJO's", jdbcInputOperator.outputPort, jdbcOutputOperator.input) + .setLocality(Locality.CONTAINER_LOCAL); + } + + /** + * This method can be modified to have field mappings based on used defined + * class<br> + * User can choose to have a SQL support type as an additional paramter + */ + private List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> addJdbcFieldInfos() + { + List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER,0)); + fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("NAME", "name", SupportType.STRING,0)); + fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("AMOUNT", "amount", SupportType.INTEGER,0)); + return fieldInfos; + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<FieldInfo> addFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); + fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); + return fieldInfos; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java new file mode 100644 index 0000000..5154db3 --- /dev/null +++ b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java @@ -0,0 +1,44 @@ +package com.example.mydtapp; + +public class PojoEvent +{ + @Override + public String toString() + { + return "TestPOJOEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; + } + + private int accountNumber; + private String name; + private int amount; + + public int getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(int accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAmount() + { + return amount; + } + + public void setAmount(int amount) + { + this.amount = amount; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml b/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..904d297 --- /dev/null +++ b/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml @@ -0,0 +1,88 @@ +<?xml version="1.0"?> +<configuration> + <!-- <property> <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from + the user or custom config when launching)</value> </property> --> + <!-- memory assigned to app master <property> <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> </property> --> + + <!-- JDBC driver in use --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseDriver + </name> + <value>org.hsqldb.jdbcDriver</value> + </property> + + <!-- URL to connect to the DB master --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseUrl + </name> + <value>jdbc:hsqldb:mem:test</value> + </property> + + <!-- # rows that the operator can retrieve in a window --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.fetchSize + </name> + <value>120</value> + </property> + + <!-- POJO class --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS + </name> + <value>com.example.mydtapp.PojoEvent</value> + </property> + + <!-- Query to fetch data --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.query + </name> + <value>select * from test_event_table + </value> + </property> + + <!-- Input Table name --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.tableName + </name> + <value>test_event_table</value> + </property> + + <!-- JDBC driver in use --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseDriver + </name> + <value>org.hsqldb.jdbcDriver</value> + </property> + + <!-- URL to connect to the DB master --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseUrl + </name> + <value>jdbc:hsqldb:mem:test</value> + </property> + + <!-- # rows that the operator can retrieve in a window --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.batchSize + </name> + <value>5</value> + </property> + + <!-- Output Table name --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.tablename + </name> + <value>test_output_event_table</value> + </property> + + <!-- POJO class --> + <property> + <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.port.input.attr.TUPLE_CLASS + </name> + <value>com.example.mydtapp.PojoEvent</value> + </property> + +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java new file mode 100644 index 0000000..ea4c345 --- /dev/null +++ b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java @@ -0,0 +1,42 @@ +/** + * Put your copyright and license info here. + */ +package com.example.mydtapp; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode.<br> + * The assumption to run this test case is that test_event_table,meta-table and + * test_output_event_table are created already + */ +public class ApplicationTest +{ + + @Test + @Ignore + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new JdbcToJdbcApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(50000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java new file mode 100644 index 0000000..f4709ba --- /dev/null +++ b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.example.mydtapp; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.AbstractJdbcInputOperator; +import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +/** + * Tests for {@link AbstractJdbcTransactionableOutputOperator} and + * {@link AbstractJdbcInputOperator} + */ +public class JdbcOperatorTest +{ + public static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + + private static final String TABLE_NAME = "test_event_table"; + private static final String OUTPUT_TABLE_NAME = "test_output_event_table"; + + @BeforeClass + public static void setup() + { + + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + "UNIQUE (" + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")"; + + System.out.println(createMetaTable); + stmt.executeUpdate(createMetaTable); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + insertEventsInTable(10, 0); + + String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createOutputTable); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void cleanTable() + { + try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + String cleanOutputTable = "delete from " + OUTPUT_TABLE_NAME; + stmt.executeUpdate(cleanOutputTable); + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + OUTPUT_TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new JdbcToJdbcApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for records to be added to table + Thread.sleep(5000); + + Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); + cleanTable(); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/test/resources/example.sql ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/test/resources/example.sql b/examples/jdbcToJdbc/src/test/resources/example.sql new file mode 100644 index 0000000..104240c --- /dev/null +++ b/examples/jdbcToJdbc/src/test/resources/example.sql @@ -0,0 +1,36 @@ +DROP DATABASE IF EXISTS testDev; + +CREATE DATABASE testDev; + +USE testDev; + +CREATE TABLE IF NOT EXISTS `test_event_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255) DEFAULT NULL, + `AMOUNT` int(11) DEFAULT NULL +) ENGINE=MyISAM DEFAULT CHARSET=latin1; + +INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES +(1, 'User1', 1000), +(2, 'User2', 2000), +(3, 'User3', 3000), +(4, 'User4', 4000), +(5, 'User5', 5000), +(6, 'User6', 6000), +(7, 'User7', 7000), +(8, 'User8', 8000), +(9, 'User9', 9000), +(10, 'User10', 1000); + +CREATE TABLE IF NOT EXISTS `test_output_event_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255) DEFAULT NULL, + `AMOUNT` int(11) DEFAULT NULL +) ENGINE=MyISAM DEFAULT CHARSET=latin1; + +CREATE TABLE IF NOT EXISTS `dt_meta` ( + `dt_app_id` VARCHAR(100) NOT NULL, + `dt_operator_id` INT NOT NULL, + `dt_window` BIGINT NOT NULL, +UNIQUE (`dt_app_id`, `dt_operator_id`, `dt_window`) +) ENGINE=MyISAM DEFAULT CHARSET=latin1; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/eae0eeee/examples/jdbcToJdbc/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/jdbcToJdbc/src/test/resources/log4j.properties b/examples/jdbcToJdbc/src/test/resources/log4j.properties new file mode 100644 index 0000000..3bfcdc5 --- /dev/null +++ b/examples/jdbcToJdbc/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug
