Fixed tests and POM.Changes related to sql connector.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9b99e0ae Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9b99e0ae Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9b99e0ae Branch: refs/heads/master Commit: 9b99e0aec48467b22187277c8345d68bda153152 Parents: eae0eee Author: Lakshmi Prasanna Velineni <[email protected]> Authored: Tue Apr 18 10:17:19 2017 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Wed Jun 7 10:33:00 2017 -0700 ---------------------------------------------------------------------- examples/fileToJdbc/.gitignore | 3 - examples/fileToJdbc/README.md | 70 ---- .../fileToJdbc/XmlJavadocCommentsExtractor.xsl | 44 --- examples/fileToJdbc/pom.xml | 293 ----------------- examples/fileToJdbc/src/assemble/appPackage.xml | 43 --- .../com/example/FileToJdbcApp/CustomParser.java | 95 ------ .../com/example/FileToJdbcApp/FileReader.java | 20 -- .../FileToJdbcApp/FileToJdbcCsvParser.java | 54 ---- .../FileToJdbcApp/FileToJdbcCustomParser.java | 50 --- .../com/example/FileToJdbcApp/PojoEvent.java | 45 --- .../src/main/resources/META-INF/properties.xml | 48 --- .../fileToJdbc/src/main/resources/schema.json | 19 -- .../example/FileToJdbcApp/ApplicationTest.java | 124 ------- .../fileToJdbc/src/test/resources/example.sql | 8 - .../src/test/resources/log4j.properties | 21 -- .../src/test/resources/test-input/sample.txt | 10 - examples/fileToJdbc/src/test/resources/test.xml | 58 ---- examples/jdbc/README.md | 193 +++++++++++ examples/jdbc/pom.xml | 79 +++++ examples/jdbc/src/assemble/appPackage.xml | 43 +++ .../examples/FileToJdbcApp/CustomParser.java | 95 ++++++ .../apex/examples/FileToJdbcApp/FileReader.java | 20 ++ .../FileToJdbcApp/FileToJdbcCsvParser.java | 56 ++++ .../FileToJdbcApp/FileToJdbcCustomParser.java | 50 +++ .../apex/examples/FileToJdbcApp/PojoEvent.java | 45 +++ .../JdbcIngest/FileLineOutputOperator.java | 36 +++ .../apex/examples/JdbcIngest/JdbcHDFSApp.java | 74 +++++ .../JdbcIngest/JdbcPollerApplication.java | 49 +++ .../apex/examples/JdbcIngest/PojoEvent.java | 44 +++ .../apex/examples/JdbcToJdbc/JdbcToJdbcApp.java | 101 ++++++ .../apex/examples/JdbcToJdbc/PojoEvent.java | 44 +++ .../META-INF/properties-FileToJdbcApp.xml | 56 ++++ .../META-INF/properties-JdbcToJdbcApp.xml | 88 +++++ .../META-INF/properties-PollJdbcToHDFSApp.xml | 71 +++++ .../META-INF/properties-SimpleJdbcToHDFSApp.xml | 66 ++++ examples/jdbc/src/main/resources/schema.json | 19 ++ .../examples/FileToJdbcApp/ApplicationTest.java | 131 ++++++++ .../examples/JdbcIngest/ApplicationTest.java | 56 ++++ .../examples/JdbcIngest/JdbcInputAppTest.java | 137 ++++++++ .../JdbcIngest/JdbcPollerApplicationTest.java | 129 ++++++++ .../examples/JdbcToJdbc/ApplicationTest.java | 42 +++ .../examples/JdbcToJdbc/JdbcOperatorTest.java | 160 ++++++++++ .../test/resources/example-FileToJdbcApp.sql | 8 + .../src/test/resources/example-JdbcIngest.sql | 24 ++ .../src/test/resources/example-JdbcToJdbc.sql | 36 +++ .../jdbc/src/test/resources/log4j.properties | 21 ++ .../src/test/resources/test-FileToJdbcApp.xml | 58 ++++ .../resources/test-input/sample-FileToJdbc.txt | 10 + examples/jdbcIngest/.gitignore | 1 - examples/jdbcIngest/README.md | 65 ---- .../jdbcIngest/XmlJavadocCommentsExtractor.xsl | 44 --- examples/jdbcIngest/pom.xml | 298 ----------------- examples/jdbcIngest/src/assemble/appPackage.xml | 43 --- .../example/mydtapp/FileLineOutputOperator.java | 36 --- .../java/com/example/mydtapp/JdbcHDFSApp.java | 75 ----- .../example/mydtapp/JdbcPollerApplication.java | 48 --- .../java/com/example/mydtapp/PojoEvent.java | 44 --- .../META-INF/properties-PollJdbcToHDFSApp.xml | 73 ----- .../META-INF/properties-SimpleJdbcToHDFSApp.xml | 66 ---- .../com/example/mydtapp/ApplicationTest.java | 56 ---- .../com/example/mydtapp/JdbcInputAppTest.java | 137 -------- .../mydtapp/JdbcPollerApplicationTest.java | 128 -------- .../jdbcIngest/src/test/resources/example.sql | 24 -- .../src/test/resources/log4j.properties | 21 -- examples/jdbcToJdbc/.gitignore | 1 - examples/jdbcToJdbc/README.md | 55 ---- .../jdbcToJdbc/XmlJavadocCommentsExtractor.xsl | 44 --- examples/jdbcToJdbc/pom.xml | 319 ------------------- examples/jdbcToJdbc/src/assemble/appPackage.xml | 43 --- .../java/com/example/mydtapp/JdbcToJdbcApp.java | 101 ------ .../java/com/example/mydtapp/PojoEvent.java | 44 --- .../src/main/resources/META-INF/properties.xml | 88 ----- .../com/example/mydtapp/ApplicationTest.java | 42 --- .../com/example/mydtapp/JdbcOperatorTest.java | 155 --------- .../jdbcToJdbc/src/test/resources/example.sql | 36 --- .../src/test/resources/log4j.properties | 21 -- examples/pom.xml | 2 + 77 files changed, 2043 insertions(+), 3113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/.gitignore ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/.gitignore b/examples/fileToJdbc/.gitignore deleted file mode 100755 index 019edc2..0000000 --- a/examples/fileToJdbc/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -.DS_Store -/.idea/ -/target/ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/README.md ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/README.md b/examples/fileToJdbc/README.md deleted file mode 100755 index ad9f6b9..0000000 --- a/examples/fileToJdbc/README.md +++ /dev/null @@ -1,70 +0,0 @@ -## Sample File to JDBC Example - -This example shows how to read files from HDFS, parse into POJOs and then insert into a table in MySQL. - -Given various parsing demands, we give two applications under this package, `FileToJdbcCsvParser` and `FileToJdbcCustomParser`. - -`CsvParser` allows you to parse only CSV format input files. For more complex input format, `CustomParser` allows you to set custom regex to parse. - -Accordingly, we have two additional configuration files (`src/site/conf/exampleCsvParser.xml` and `src/site/conf/exampleCustomParser.xml`) besides the common properties file (`/src/main/resources/META-INF/properties.xml`). - -Users can choose which applicaiton and which addtional configuration file to use during launch time. - - -####**Update Properties:** - -- Update these common properties in the file `/src/main/resources/META-INF/properties.xml`: - -| Property Name | Description | -| ------------- | ----------- | -| dt.operator.FileReader.prop.directory |HDFS input directory path -|dt.operator.JdbcOutput.prop.store.databaseUrl | database URL of the form `jdbc:mysql://hostName:portNumber/dbName` | -| dt.operator.JdbcOutput.prop.store.userName | MySQL user name | -| dt.operator.JdbcOutput.prop.store.password | MySQL user password | -| dt.operator.JdbcOutput.prop.tablename | MySQL output table name | - -- Using CustomParser: update `regexStr` in file `src/site/conf/exampleCustomParser.xml` - - -####**Sample Input:** - -- To set up MySQL database and create table, check `src/test/resources/example.sql` -- To run this example, create files using this format: - -``` - 1,User1,1000 - 2,User2,2000 - 3,User3,3000 - 4,User4,4000 - 5,User5,5000 - 6,User6,6000 - 7,User7,7000 - 8,User8,8000 - 9,User9,9000 - 10,User10,10000 -``` -- To change input format, update `PojoEvent` class and `addFieldInfos()` method in `src/main/java/com/example/FileToJdbcApp`. If using CsvParser, also update `src/main/resources/schema.json`. - -####**Sample Output:** - -- After running successfully, verify -that the database table has the expected output: - -``` - mysql> select * from table_name; - +------------+--------+--------+ - | ACCOUNT_NO | NAME | AMOUNT | - +------------+--------+--------+ - | 1 | User1 | 1000 | - | 2 | User2 | 2000 | - | 3 | User3 | 3000 | - | 4 | User4 | 4000 | - | 5 | User5 | 5000 | - | 6 | User6 | 6000 | - | 7 | User7 | 7000 | - | 8 | User8 | 8000 | - | 9 | User9 | 9000 | - | 10 | User10 | 10000 | - +------------+--------+--------+ - 10 rows in set (0.00 sec) -``` http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl b/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl deleted file mode 100755 index 08075a9..0000000 --- a/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl +++ /dev/null @@ -1,44 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> - -<!-- - Document : XmlJavadocCommentsExtractor.xsl - Created on : September 16, 2014, 11:30 AM - Description: - The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. ---> - -<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> - <xsl:output method="xml" standalone="yes"/> - - <!-- copy xml by selecting only the following nodes, attributes and text --> - <xsl:template match="node()|text()|@*"> - <xsl:copy> - <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> - </xsl:copy> - </xsl:template> - - <!-- Strip off the following paths from the selected xml --> - <xsl:template match="//root/package/interface/interface - |//root/package/interface/method/@qualified - |//root/package/class/interface - |//root/package/class/class - |//root/package/class/method/@qualified - |//root/package/class/field/@qualified" /> - - <xsl:strip-space elements="*"/> -</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/pom.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/pom.xml b/examples/fileToJdbc/pom.xml deleted file mode 100755 index ae62e0c..0000000 --- a/examples/fileToJdbc/pom.xml +++ /dev/null @@ -1,293 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>FileToJdbcApp</artifactId> - <packaging>jar</packaging> - - <!-- change these to the appropriate values --> - <name>File to JDBC</name> - <description>My Apex Application Description</description> - - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <malhar.version>3.6.0</malhar.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-contrib</artifactId> - <version>${malhar.version}</version> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>5.1.36</version> - </dependency> - <dependency> - <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> - <version>2.7.8</version> - </dependency> - <dependency> - <groupId>org.codehaus.janino</groupId> - <artifactId>commons-compiler</artifactId> - <version>2.7.8</version> - </dependency> - <dependency> - <groupId>net.sf.supercsv</groupId> - <artifactId>super-csv</artifactId> - <version>2.4.0</version> - </dependency> - <dependency> - <groupId>org.hsqldb</groupId> - <artifactId>hsqldb</artifactId> - <version>2.3.1</version> - </dependency> - </dependencies> -</project> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/assemble/appPackage.xml b/examples/fileToJdbc/src/assemble/appPackage.xml deleted file mode 100755 index 7ad071c..0000000 --- a/examples/fileToJdbc/src/assemble/appPackage.xml +++ /dev/null @@ -1,43 +0,0 @@ -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/resources</directory> - <outputDirectory>/resources</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java deleted file mode 100755 index e089255..0000000 --- a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.example.FileToJdbcApp; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.regex.Pattern; - -// parse input line into pojo event -public class CustomParser extends BaseOperator -{ - private static final Logger LOG = LoggerFactory.getLogger(CustomParser.class); - - // default regex pattern for parsing each line - private static final Pattern RegexDefault = Pattern.compile("[\\p{Punct}\\s]+"); - - private String regexStr; // customized configurable regex string - private transient Pattern regexPattern; // compiled regex pattern generated from customized regex string - - @OutputPortFieldAnnotation(optional = false) - public final transient DefaultOutputPort<PojoEvent> output = new DefaultOutputPort<>(); - - public final transient DefaultInputPort<String> - input = new DefaultInputPort<String>() { - - @Override - public void process(String line) - { - // use custom regex to split line into words - final String[] words = regexPattern.split(line); - - PojoEvent pojo = new PojoEvent(); - // transform words array into pojo event - try { - int accnum = Integer.parseInt(words[0]); - pojo.setAccountNumber(accnum); - } catch (NumberFormatException e) { - LOG.error("Number Format Exception", e); - pojo.setAccountNumber(0); - } - String name = words[1]; - pojo.setName(name); - try { - int amount = Integer.parseInt(words[2]); - pojo.setAmount(amount); - } catch (NumberFormatException e) { - LOG.error("Number Format Exception", e); - pojo.setAmount(0); - } - output.emit(pojo); - } - }; - - public String getRegexStr() { - return this.regexStr; - } - - public void setRegexStr(String regex) { - this.regexStr = regex; - } - - @Override - public void setup(OperatorContext context) - { - if (null == regexStr) { - regexPattern = RegexDefault; - } else { - regexPattern = Pattern.compile(this.getRegexStr()); - } - } - -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java deleted file mode 100755 index 201c705..0000000 --- a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.example.FileToJdbcApp; - -import com.datatorrent.api.DefaultOutputPort; -import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; - -public class FileReader extends LineByLineFileInputOperator{ - - /** - * output in bytes to match CsvParser input type - */ - public final transient DefaultOutputPort<byte[]> byteOutput = new DefaultOutputPort<>(); - - @Override - protected void emit(String tuple) - { - output.emit(tuple); - byteOutput.emit(tuple.getBytes()); - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java deleted file mode 100755 index 23d3f36..0000000 --- a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.example.FileToJdbcApp; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.parser.CsvParser; -import com.datatorrent.lib.appdata.schemas.SchemaUtils; -import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; -import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; -import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; - -import java.util.List; - -import static java.sql.Types.INTEGER; -import static java.sql.Types.VARCHAR; - -@ApplicationAnnotation(name = "FileToJdbcCsvParser") -public class FileToJdbcCsvParser implements StreamingApplication{ - - @Override - public void populateDAG(DAG dag, Configuration configuration) { - // create operators - FileReader fileReader = dag.addOperator("FileReader", FileReader.class); - CsvParser csvParser = dag.addOperator("CsvParser", CsvParser.class); - JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", JdbcPOJOInsertOutputOperator.class); - - // configure operators - String pojoSchema = SchemaUtils.jarResourceFileToString("schema.json"); - csvParser.setSchema(pojoSchema); - - jdbcOutputOperator.setFieldInfos(addFieldInfos()); - JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); - jdbcOutputOperator.setStore(outputStore); - - // add stream - dag.addStream("Bytes", fileReader.byteOutput, csvParser.in); - dag.addStream("POJOs", csvParser.out, jdbcOutputOperator.input); - } - - /** - * This method can be modified to have field mappings based on used defined - * class - */ - private List<JdbcFieldInfo> addFieldInfos() { - List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", JdbcFieldInfo.SupportType.INTEGER , INTEGER)); - fieldInfos.add(new JdbcFieldInfo("NAME", "name", JdbcFieldInfo.SupportType.STRING, VARCHAR)); - fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); - return fieldInfos; - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java deleted file mode 100755 index c13377f..0000000 --- a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.example.FileToJdbcApp; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; -import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; -import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; - -import java.util.List; - -import static java.sql.Types.INTEGER; -import static java.sql.Types.VARCHAR; - -@ApplicationAnnotation(name = "FileToJdbcCustomParser") -public class FileToJdbcCustomParser implements StreamingApplication{ - - @Override - public void populateDAG(DAG dag, Configuration configuration) { - // create operators - FileReader fileReader = dag.addOperator("FileReader", FileReader.class); - CustomParser customParser = dag.addOperator("CustomParser", CustomParser.class); - JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", JdbcPOJOInsertOutputOperator.class); - - // configure operators - jdbcOutputOperator.setFieldInfos(addFieldInfos()); - JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); - jdbcOutputOperator.setStore(outputStore); - - // add stream - dag.addStream("Data", fileReader.output, customParser.input); - dag.addStream("POJOs", customParser.output, jdbcOutputOperator.input); - } - - /** - * This method can be modified to have field mappings based on used defined - * class - */ - private List<JdbcFieldInfo> addFieldInfos() - { - List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", JdbcFieldInfo.SupportType.INTEGER , INTEGER)); - fieldInfos.add(new JdbcFieldInfo("NAME", "name", JdbcFieldInfo.SupportType.STRING, VARCHAR)); - fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); - return fieldInfos; - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java deleted file mode 100755 index 7985b45..0000000 --- a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.example.FileToJdbcApp; - -public class PojoEvent -{ - @Override - public String toString() - { - return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; - } - - private int accountNumber; - private String name; - private int amount; - - public int getAccountNumber() - { - return accountNumber; - } - - public void setAccountNumber(int accountNumber) - { - this.accountNumber = accountNumber; - } - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public int getAmount() - { - return amount; - } - - public void setAmount(int amount) - { - this.amount = amount; - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/resources/META-INF/properties.xml b/examples/fileToJdbc/src/main/resources/META-INF/properties.xml deleted file mode 100755 index 4f706c4..0000000 --- a/examples/fileToJdbc/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,48 +0,0 @@ -<?xml version="1.0"?> -<configuration> - <property> - <name>dt.operator.JdbcOutput.prop.store.databaseDriver</name> - <value>com.mysql.jdbc.Driver</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.store.databaseUrl</name> - <value>jdbc:mysql://hostName:portNumber/dbName</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.store.userName</name> - <value>root</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.store.password</name> - <value>password</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.batchSize</name> - <value>5</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.tablename</name> - <value>table_name</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.port.input.attr.TUPLE_CLASS</name> - <value>com.example.FileToJdbcApp.PojoEvent</value> - </property> - - <property> - <name>dt.operator.FileReader.prop.directory</name> - <value>input_directory</value> - </property> - - <property> - <name>dt.loggers.level</name> - <value>com.datatorrent.*:INFO,org.apache.*:INFO</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/main/resources/schema.json ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/resources/schema.json b/examples/fileToJdbc/src/main/resources/schema.json deleted file mode 100755 index 3c191cf..0000000 --- a/examples/fileToJdbc/src/main/resources/schema.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "separator": ",", - "quoteChar":"\"", - "fields": [ - { - "name": "AccountNumber", - "type": "INTEGER" - }, - { - "name": "Name", - "type": "String" - }, - { - "name": "Amount", - "type": "INTEGER" - } - ] -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java b/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java deleted file mode 100755 index 806bf2e..0000000 --- a/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java +++ /dev/null @@ -1,124 +0,0 @@ -package com.example.FileToJdbcApp; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; -import com.datatorrent.netlet.util.DTThrowable; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import javax.validation.ConstraintViolationException; -import java.io.File; -import java.io.IOException; -import java.sql.*; - -/** - * Test the DAG declaration in local mode.<br> - * The assumption to run this test case is that test_jdbc_table - * and meta-table are created already. - */ -public class ApplicationTest { - private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; - private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; - private static final String TABLE_NAME = "test_jdbc_table"; - - @BeforeClass - public static void setup() { - try { - Class.forName(DB_DRIVER).newInstance(); - - Connection con = DriverManager.getConnection(DB_URL); - Statement stmt = con.createStatement(); - - String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " - + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " - + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " - + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " - + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " - + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " - + ")"; - stmt.executeUpdate(createMetaTable); - - String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME - + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; - stmt.executeUpdate(createTable); - - } catch (Throwable e) { - DTThrowable.rethrow(e); - } - } - - public static void cleanTable() - { - try { - Connection con = DriverManager.getConnection(DB_URL); - Statement stmt = con.createStatement(); - String cleanTable = "delete from " + TABLE_NAME; - stmt.executeUpdate(cleanTable); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public int getNumOfEventsInStore() - { - Connection con; - try { - con = DriverManager.getConnection(DB_URL); - Statement stmt = con.createStatement(); - - String countQuery = "SELECT count(*) from " + TABLE_NAME; - ResultSet resultSet = stmt.executeQuery(countQuery); - resultSet.next(); - return resultSet.getInt(1); - } catch (SQLException e) { - throw new RuntimeException("fetching count", e); - } - } - - @Test - public void testCsvParserApp() throws IOException, Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(new File("src/test/resources/test.xml").toURI().toURL()); - - lma.prepareDAG(new FileToJdbcCsvParser(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); // test will terminate after results are available - - // wait for records to be added to table - Thread.sleep(5000); - - Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); - cleanTable(); - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - - @Test - public void testCustomParserApp() throws IOException, Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(new File("src/test/resources/test.xml").toURI().toURL()); - - lma.prepareDAG(new FileToJdbcCustomParser(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); // test will terminate after results are available - - // wait for records to be added to table - Thread.sleep(5000); - - Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); - cleanTable(); - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/test/resources/example.sql ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/example.sql b/examples/fileToJdbc/src/test/resources/example.sql deleted file mode 100644 index 4461247..0000000 --- a/examples/fileToJdbc/src/test/resources/example.sql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE DATABASE IF NOT EXISTS testJdbc; - -USE testJdbc; - -CREATE TABLE IF NOT EXISTS `test_jdbc_table` ( - `ACCOUNT_NO` int(11) NOT NULL, - `NAME` varchar(255), - `AMOUNT` int(11)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/log4j.properties b/examples/fileToJdbc/src/test/resources/log4j.properties deleted file mode 100755 index 3bfcdc5..0000000 --- a/examples/fileToJdbc/src/test/resources/log4j.properties +++ /dev/null @@ -1,21 +0,0 @@ -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/test/resources/test-input/sample.txt ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/test-input/sample.txt b/examples/fileToJdbc/src/test/resources/test-input/sample.txt deleted file mode 100644 index 362253e..0000000 --- a/examples/fileToJdbc/src/test/resources/test-input/sample.txt +++ /dev/null @@ -1,10 +0,0 @@ -1,User1,1000 -2,User2,2000 -3,User3,3000 -4,User4,4000 -5,User5,5000 -6,User6,6000 -7,User7,7000 -8,User8,8000 -9,User9,9000 -10,User10,10000 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/fileToJdbc/src/test/resources/test.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/test.xml b/examples/fileToJdbc/src/test/resources/test.xml deleted file mode 100755 index c3a49c4..0000000 --- a/examples/fileToJdbc/src/test/resources/test.xml +++ /dev/null @@ -1,58 +0,0 @@ -<?xml version="1.0"?> -<configuration> - <property> - <name>dt.operator.JdbcOutput.prop.store.databaseDriver</name> - <value>org.hsqldb.jdbcDriver</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.store.databaseUrl</name> - <value>jdbc:hsqldb:mem:test;sql.syntax_mys=true</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.store.userName</name> - <value>sa</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.store.password</name> - <value></value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.batchSize</name> - <value>5</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.prop.tablename</name> - <value>test_jdbc_table</value> - </property> - - <property> - <name>dt.operator.JdbcOutput.port.input.attr.TUPLE_CLASS</name> - <value>com.example.FileToJdbcApp.PojoEvent</value> - </property> - - <property> - <name>dt.operator.FileReader.prop.directory</name> - <value>src/test/resources/test-input</value> - </property> - - <property> - <name>dt.loggers.level</name> - <value>com.datatorrent.*:INFO,org.apache.*:INFO</value> - </property> - - <property> - <name>dt.application.FileToJdbcCsvParser.operator.CsvParser.port.out.attr.TUPLE_CLASS</name> - <value>com.example.FileToJdbcApp.PojoEvent</value> - </property> - - <property> - <name>dt.application.FileToJdbcCustomParser.operator.CustomParser.prop.regexStr</name> - <value>,</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/README.md ---------------------------------------------------------------------- diff --git a/examples/jdbc/README.md b/examples/jdbc/README.md new file mode 100644 index 0000000..72c4fbb --- /dev/null +++ b/examples/jdbc/README.md @@ -0,0 +1,193 @@ +## File to JDBC Example (FileToJdbcApp) + +This example shows how to read files from HDFS, parse into POJOs and then insert into a table in a database. + +Given various parsing demands, we give two applications under this package, `FileToJdbcCsvParser` and `FileToJdbcCustomParser`. + +`CsvParser` allows you to parse only CSV format input files. For more complex input format, `CustomParser` allows you to set custom regex to parse. + +A sample properties file (`/src/main/resources/META-INF/properties-FileToJdbcApp.xml`) is provided for these applications and would need to be +customized according to the user's environment. + +The applications can then be launched using the apex command line interface and selecting the above configuration file using a parameter during +launch. + +####**Update Properties:** + +- Update these common properties in the file `/src/main/resources/META-INF/properties-FileToJdbcApp.xml`: + +| Property Name | Description | +| ------------- | ----------- | +| dt.operator.FileReader.prop.directory |HDFS input directory path +| dt.operator.JdbcOutput.prop.store.databaseUrl | database URL | +| dt.operator.JdbcOutput.prop.store.userName | database user name | +| dt.operator.JdbcOutput.prop.store.password | database user password | +| dt.operator.JdbcOutput.prop.tablename | database output table name | +| dt.operator.CustomParser.prop.regexStr | update regexStr if needed| + +####**Sample Input:** + +- To set up database and create table, check `src/test/resources/example-FileToJdbcApp-sql.txt` +- To run this example, create files using this format: + +``` + 1,User1,1000 + 2,User2,2000 + 3,User3,3000 + 4,User4,4000 + 5,User5,5000 + 6,User6,6000 + 7,User7,7000 + 8,User8,8000 + 9,User9,9000 + 10,User10,10000 +``` +- To change input format, update `PojoEvent` class and `addFieldInfos()` method in `src/main/java/org/apache/apex/examples/FileToJdbcApp`. +If using CsvParser, also update `src/main/resources/schema.json`. + +####**Sample Output:** + +- After running successfully, verify +that the database table has the expected output: + +``` + mysql> select * from table_name; + +------------+--------+--------+ + | ACCOUNT_NO | NAME | AMOUNT | + +------------+--------+--------+ + | 1 | User1 | 1000 | + | 2 | User2 | 2000 | + | 3 | User3 | 3000 | + | 4 | User4 | 4000 | + | 5 | User5 | 5000 | + | 6 | User6 | 6000 | + | 7 | User7 | 7000 | + | 8 | User8 | 8000 | + | 9 | User9 | 9000 | + | 10 | User10 | 10000 | + +------------+--------+--------+ + 10 rows in set (0.00 sec) +``` + + +## JDBC ingestion examples + +This project contains two applications to read records from a table in database, create POJOs and write them to a file +in the user specified directory in HDFS. + +1. SimpleJdbcToHDFSApp: Reads table records as per given query and emits them as POJOs. +2. PollJdbcToHDFSApp: Reads table records using partitions in parallel fashion also polls for newly **appended** records and emits them as POJOs. + +Follow these steps to run these applications: + +**Step 1**: Update these properties in the file `src/main/resources/META_INF/properties-<applicationName>.xml`, where <applicationName> represents +the application name and is one of two names above: + +| Property Name | Description | +| ------------- | ----------- | +| dt.application.<applicationName>.operator.JdbcInput.prop.store.databaseUrl | database URL, for example `jdbc:hsqldb:mem:test` | +| dt.application.<applicationName>.operator.JdbcInput.prop.store.userName | database user name | +| dt.application.<applicationName>.operator.JdbcInput.prop.store.password | database user password | +| dt.application.<applicationName>.operator.FileOutputOperator.filePath | HDFS output directory path | + +**Step 2**: Create database table and add entries + +Go to the database console and run (where _{path}_ is a suitable prefix): + + source {path}/src/test/resources/example.sql + +After this, please verify that `testDev.test_event_table` is created and has 10 rows: + + select count(*) from testDev.test_event_table; + +----------+ + | count(*) | + +----------+ + | 10 | + +----------+ + +**Step 3**: Create HDFS output directory if not already present (_{path}_ should be the same as specified in `META_INF/properties-<applicationName>.xml`): + + hadoop fs -mkdir -p {path} + +**Step 4**: Build the code: + + mvn clean install + +**Step 5**: During launch use `src/main/resources/META_INF/properties-<applicationName>.xml` as a custom configuration file; then verify +that the output directory has the expected output: + + hadoop fs -cat <hadoop directory path>/2_op.dat.* | wc -l + +This should return 10 as the count. + +Sample Output: + + hadoop fs -cat <path_to_file>/2_op.dat.0 + PojoEvent [accountNumber=1, name=User1, amount=1000] + PojoEvent [accountNumber=2, name=User2, amount=2000] + PojoEvent [accountNumber=3, name=User3, amount=3000] + PojoEvent [accountNumber=4, name=User4, amount=4000] + PojoEvent [accountNumber=5, name=User5, amount=5000] + PojoEvent [accountNumber=6, name=User6, amount=6000] + PojoEvent [accountNumber=7, name=User7, amount=7000] + PojoEvent [accountNumber=8, name=User8, amount=8000] + PojoEvent [accountNumber=9, name=User9, amount=9000] + PojoEvent [accountNumber=10, name=User10, amount=1000] + + +## JdbcToJdbc App + +This application reads from a source table in a database, creates POJO's and writes the POJO's to another table in a database. + +Steps : + +Step 1 : Update the below properties in the properties file - `src/main/resources/META_INF/properties-JdbcToJdbcApp.xml` + +1.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseUrl +- data base URL for your database, for example jdbc:hsqldb:mem:test +2.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.userName +- mysql user name +3.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.password +- password +4.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseUrl +- data base URL for your database, for example jdbc:jdbc:hsqldb:mem:test +5.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.userName +- mysql user name +6.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.password +- password + +Step 2: Create database, table and add entries + +Load into your database the contents of the following sql file +<path to > src/test/resources/example-JdbcToJdbc-sql.txt + +After this is done, please verify that testDev.test_event_table is created and has 10 rows.It will also create an output table by the name testDev.test_output_event_table + +select count(*) from testDev.test_event_table; ++----------+ +| count(*) | ++----------+ +| 10 | ++----------+ + +Step 3: Build the code, +shell>mvn clean install + +This will compile the project and create the application package in the target folder. + +Step 4 : Launch the application package with the apex command line interface and +select the above configuration file during launch. + +Verification : + +Log on to the mysql console + +select count(*) from testDev.test_event_table; ++----------+ +| count(*) | ++----------+ +| 10 | ++----------+ + + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/pom.xml b/examples/jdbc/pom.xml new file mode 100644 index 0000000..b697a68 --- /dev/null +++ b/examples/jdbc/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-JDBC</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar JDBC Examples</name> + <description>Demostrates the JDBC Examples</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.8.0-SNAPSHOT</version> + </parent> + + <dependencies> + <!-- add your dependencies here --> + + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.core.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- replace with your jdbc driver dependency here --> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + </dependency> + + <dependency> + <groupId>org.jooq</groupId> + <artifactId>jooq</artifactId> + <version>3.6.4</version> + </dependency> + + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + </dependency> + + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>2.7.8</version> + </dependency> + + <dependency> + <groupId>net.sf.supercsv</groupId> + <artifactId>super-csv</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/assemble/appPackage.xml b/examples/jdbc/src/assemble/appPackage.xml new file mode 100644 index 0000000..7ad071c --- /dev/null +++ b/examples/jdbc/src/assemble/appPackage.xml @@ -0,0 +1,43 @@ +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/resources</directory> + <outputDirectory>/resources</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/CustomParser.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/CustomParser.java b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/CustomParser.java new file mode 100755 index 0000000..a22acc9 --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/CustomParser.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.FileToJdbcApp; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Pattern; + +// parse input line into pojo event +public class CustomParser extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(CustomParser.class); + + // default regex pattern for parsing each line + private static final Pattern RegexDefault = Pattern.compile("[\\p{Punct}\\s]+"); + + private String regexStr; // customized configurable regex string + private transient Pattern regexPattern; // compiled regex pattern generated from customized regex string + + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort<PojoEvent> output = new DefaultOutputPort<>(); + + public final transient DefaultInputPort<String> + input = new DefaultInputPort<String>() { + + @Override + public void process(String line) + { + // use custom regex to split line into words + final String[] words = regexPattern.split(line); + + PojoEvent pojo = new PojoEvent(); + // transform words array into pojo event + try { + int accnum = Integer.parseInt(words[0]); + pojo.setAccountNumber(accnum); + } catch (NumberFormatException e) { + LOG.error("Number Format Exception", e); + pojo.setAccountNumber(0); + } + String name = words[1]; + pojo.setName(name); + try { + int amount = Integer.parseInt(words[2]); + pojo.setAmount(amount); + } catch (NumberFormatException e) { + LOG.error("Number Format Exception", e); + pojo.setAmount(0); + } + output.emit(pojo); + } + }; + + public String getRegexStr() { + return this.regexStr; + } + + public void setRegexStr(String regex) { + this.regexStr = regex; + } + + @Override + public void setup(OperatorContext context) + { + if (null == regexStr) { + regexPattern = RegexDefault; + } else { + regexPattern = Pattern.compile(this.getRegexStr()); + } + } + +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileReader.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileReader.java b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileReader.java new file mode 100755 index 0000000..4065a5f --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileReader.java @@ -0,0 +1,20 @@ +package org.apache.apex.examples.FileToJdbcApp; + +import com.datatorrent.api.DefaultOutputPort; +import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; + +public class FileReader extends LineByLineFileInputOperator{ + + /** + * output in bytes to match CsvParser input type + */ + public final transient DefaultOutputPort<byte[]> byteOutput = new DefaultOutputPort<>(); + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + byteOutput.emit(tuple.getBytes()); + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCsvParser.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCsvParser.java b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCsvParser.java new file mode 100755 index 0000000..d60b275 --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCsvParser.java @@ -0,0 +1,56 @@ +package org.apache.apex.examples.FileToJdbcApp; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.parser.CsvParser; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +import static java.sql.Types.INTEGER; +import static java.sql.Types.VARCHAR; + +@ApplicationAnnotation(name = "FileToJdbcCsvParser") +public class FileToJdbcCsvParser implements StreamingApplication{ + + @Override + public void populateDAG(DAG dag, Configuration configuration) { + // create operators + FileReader fileReader = dag.addOperator("FileReader", FileReader.class); + CsvParser csvParser = dag.addOperator("CsvParser", CsvParser.class); + JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", JdbcPOJOInsertOutputOperator.class); + + // configure operators + String pojoSchema = SchemaUtils.jarResourceFileToString("schema.json"); + csvParser.setSchema(pojoSchema); + + jdbcOutputOperator.setFieldInfos(addFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutputOperator.setStore(outputStore); + + // add stream + dag.addStream("Bytes", fileReader.byteOutput, csvParser.in); + dag.addStream("POJOs", csvParser.out, jdbcOutputOperator.input); + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<JdbcFieldInfo> addFieldInfos() { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", JdbcFieldInfo.SupportType.INTEGER , INTEGER)); + fieldInfos.add(new JdbcFieldInfo("NAME", "name", JdbcFieldInfo.SupportType.STRING, VARCHAR)); + fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); + return fieldInfos; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCustomParser.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCustomParser.java b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCustomParser.java new file mode 100755 index 0000000..42d1d23 --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/FileToJdbcCustomParser.java @@ -0,0 +1,50 @@ +package org.apache.apex.examples.FileToJdbcApp; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +import static java.sql.Types.INTEGER; +import static java.sql.Types.VARCHAR; + +@ApplicationAnnotation(name = "FileToJdbcCustomParser") +public class FileToJdbcCustomParser implements StreamingApplication{ + + @Override + public void populateDAG(DAG dag, Configuration configuration) { + // create operators + FileReader fileReader = dag.addOperator("FileReader", FileReader.class); + CustomParser customParser = dag.addOperator("CustomParser", CustomParser.class); + JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", JdbcPOJOInsertOutputOperator.class); + + // configure operators + jdbcOutputOperator.setFieldInfos(addFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutputOperator.setStore(outputStore); + + // add stream + dag.addStream("Data", fileReader.output, customParser.input); + dag.addStream("POJOs", customParser.output, jdbcOutputOperator.input); + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<JdbcFieldInfo> addFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", JdbcFieldInfo.SupportType.INTEGER , INTEGER)); + fieldInfos.add(new JdbcFieldInfo("NAME", "name", JdbcFieldInfo.SupportType.STRING, VARCHAR)); + fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); + return fieldInfos; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/PojoEvent.java b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/PojoEvent.java new file mode 100755 index 0000000..6de9274 --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/FileToJdbcApp/PojoEvent.java @@ -0,0 +1,45 @@ +package org.apache.apex.examples.FileToJdbcApp; + +public class PojoEvent +{ + @Override + public String toString() + { + return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; + } + + private int accountNumber; + private String name; + private int amount; + + public int getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(int accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAmount() + { + return amount; + } + + public void setAmount(int amount) + { + this.amount = amount; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/FileLineOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/FileLineOutputOperator.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/FileLineOutputOperator.java new file mode 100644 index 0000000..93bd8a4 --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/FileLineOutputOperator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.JdbcIngest; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +public class FileLineOutputOperator extends AbstractFileOutputOperator<Object> +{ + @Override + protected String getFileName(Object input) + { + return context.getId() + "_" + "op.dat"; + } + + @Override + protected byte[] getBytesForTuple(Object input) + { + return (input.toString() + "\n").getBytes(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcHDFSApp.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcHDFSApp.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcHDFSApp.java new file mode 100644 index 0000000..5e1efff --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcHDFSApp.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.JdbcIngest; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; + +@ApplicationAnnotation(name = "SimpleJdbcToHDFSApp") +public class JdbcHDFSApp implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator()); + /** + * The class given below can be updated to the user defined class based on + * input table schema The addField infos method needs to be updated + * accordingly This line can be commented and class can be set from the + * properties file + */ + // dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class); + + jdbcInputOperator.setFieldInfos(addFieldInfos()); + + JdbcStore store = new JdbcStore(); + jdbcInputOperator.setStore(store); + + FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator()); + + dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL); + } + + /** + * This method can be modified to have field mappings based on user defined + * class + */ + private List<FieldInfo> addFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); + fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); + return fieldInfos; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java new file mode 100644 index 0000000..de47ef4 --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java @@ -0,0 +1,49 @@ +package org.apache.apex.examples.JdbcIngest; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcPOJOPollInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; + +@ApplicationAnnotation(name = "PollJdbcToHDFSApp") +public class JdbcPollerApplication implements StreamingApplication +{ + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOPollInputOperator poller = dag.addOperator("JdbcPoller", new JdbcPOJOPollInputOperator()); + + JdbcStore store = new JdbcStore(); + poller.setStore(store); + + poller.setFieldInfos(addFieldInfos()); + + FileLineOutputOperator writer = dag.addOperator("Writer", new FileLineOutputOperator()); + dag.setInputPortAttribute(writer.input, PortContext.PARTITION_PARALLEL, true); + writer.setRotationWindows(60); + + dag.addStream("dbrecords", poller.outputPort, writer.input); + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<FieldInfo> addFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); + fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); + return fieldInfos; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/PojoEvent.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/PojoEvent.java new file mode 100644 index 0000000..be366b6 --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/PojoEvent.java @@ -0,0 +1,44 @@ +package org.apache.apex.examples.JdbcIngest; + +public class PojoEvent +{ + @Override + public String toString() + { + return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; + } + + private int accountNumber; + private String name; + private int amount; + + public int getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(int accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAmount() + { + return amount; + } + + public void setAmount(int amount) + { + this.amount = amount; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/JdbcToJdbcApp.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/JdbcToJdbcApp.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/JdbcToJdbcApp.java new file mode 100644 index 0000000..e85e4af --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/JdbcToJdbcApp.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.JdbcToJdbc; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; + +@ApplicationAnnotation(name = "JdbcToJdbcApp") +public class JdbcToJdbcApp implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator()); + JdbcStore store = new JdbcStore(); + jdbcInputOperator.setStore(store); + jdbcInputOperator.setFieldInfos(addFieldInfos()); + + /** + * The class given below can be updated to the user defined class based on + * input table schema The addField infos method needs to be updated + * accordingly This line can be commented and class can be set from the + * properties file + */ + //dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class); + + JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutputOperator.setStore(outputStore); + jdbcOutputOperator.setFieldInfos(addJdbcFieldInfos()); + + /** + * The class given below can be updated to the user defined class based on + * input table schema The addField infos method needs to be updated + * accordingly This line can be commented and class can be set from the + * properties file + */ + //dag.setInputPortAttribute(jdbcOutputOperator.input, Context.PortContext.TUPLE_CLASS, PojoEvent.class); + + dag.addStream("POJO's", jdbcInputOperator.outputPort, jdbcOutputOperator.input) + .setLocality(Locality.CONTAINER_LOCAL); + } + + /** + * This method can be modified to have field mappings based on used defined + * class<br> + * User can choose to have a SQL support type as an additional paramter + */ + private List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> addJdbcFieldInfos() + { + List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER,0)); + fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("NAME", "name", SupportType.STRING,0)); + fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("AMOUNT", "amount", SupportType.INTEGER,0)); + return fieldInfos; + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<FieldInfo> addFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); + fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); + return fieldInfos; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/PojoEvent.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/PojoEvent.java new file mode 100644 index 0000000..0abf74d --- /dev/null +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcToJdbc/PojoEvent.java @@ -0,0 +1,44 @@ +package org.apache.apex.examples.JdbcToJdbc; + +public class PojoEvent +{ + @Override + public String toString() + { + return "TestPOJOEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; + } + + private int accountNumber; + private String name; + private int amount; + + public int getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(int accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAmount() + { + return amount; + } + + public void setAmount(int amount) + { + this.amount = amount; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/resources/META-INF/properties-FileToJdbcApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/resources/META-INF/properties-FileToJdbcApp.xml b/examples/jdbc/src/main/resources/META-INF/properties-FileToJdbcApp.xml new file mode 100755 index 0000000..29b911b --- /dev/null +++ b/examples/jdbc/src/main/resources/META-INF/properties-FileToJdbcApp.xml @@ -0,0 +1,56 @@ +<?xml version="1.0"?> +<configuration> + <property> + <name>dt.operator.JdbcOutput.prop.store.databaseDriver</name> + <!-- replace value with your jbdc driver --> + <value>org.hsqldb.jdbcDriver</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.databaseUrl</name> + <!-- replace value with your jbdc url --> + <value>jdbc:hsqldb:mem:test</value> + </property> + + <!--property> + <name>dt.operator.JdbcOutput.prop.store.userName</name> + <value>username</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.password</name> + <value>password</value> + </property--> + + <property> + <name>dt.operator.JdbcOutput.prop.batchSize</name> + <value>5</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.tablename</name> + <value>table_name</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.port.input.attr.TUPLE_CLASS</name> + <value>org.apache.apex.examples.FileToJdbcApp.PojoEvent</value> + </property> + + <property> + <name>dt.operator.CsvParser.port.out.attr.TUPLE_CLASS</name> + <value>org.apache.apex.examples.FileToJdbcApp.PojoEvent</value> + </property> + + <property> + <name>dt.operator.CustomParser.prop.regexStr</name> + <value>,</value> + </property> + + <property> + <name>dt.operator.FileReader.prop.directory</name> + <value>input_directory</value> + </property> + +</configuration> +
