SPOI-8252 #resolve #comment Create a simple file to JDBC application
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a1b7155c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a1b7155c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a1b7155c Branch: refs/heads/master Commit: a1b7155c5f1968e2103c060cc8ba73cc9a1f7eac Parents: 07869c8 Author: Yunhan Wang <[email protected]> Authored: Mon Apr 3 14:58:19 2017 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Wed Jun 7 10:33:00 2017 -0700 ---------------------------------------------------------------------- examples/fileToJdbc/.gitignore | 3 + examples/fileToJdbc/README.md | 70 +++++ .../fileToJdbc/XmlJavadocCommentsExtractor.xsl | 44 +++ examples/fileToJdbc/pom.xml | 293 +++++++++++++++++++ examples/fileToJdbc/src/assemble/appPackage.xml | 43 +++ .../com/example/FileToJdbcApp/CustomParser.java | 95 ++++++ .../com/example/FileToJdbcApp/FileReader.java | 20 ++ .../FileToJdbcApp/FileToJdbcCsvParser.java | 54 ++++ .../FileToJdbcApp/FileToJdbcCustomParser.java | 50 ++++ .../com/example/FileToJdbcApp/PojoEvent.java | 45 +++ .../src/main/resources/META-INF/properties.xml | 48 +++ .../fileToJdbc/src/main/resources/schema.json | 19 ++ .../example/FileToJdbcApp/ApplicationTest.java | 124 ++++++++ .../fileToJdbc/src/test/resources/example.sql | 8 + .../src/test/resources/log4j.properties | 21 ++ .../src/test/resources/test-input/sample.txt | 10 + examples/fileToJdbc/src/test/resources/test.xml | 58 ++++ 17 files changed, 1005 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/.gitignore ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/.gitignore b/examples/fileToJdbc/.gitignore new file mode 100755 index 0000000..019edc2 --- /dev/null +++ b/examples/fileToJdbc/.gitignore @@ -0,0 +1,3 @@ +.DS_Store +/.idea/ +/target/ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/README.md ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/README.md b/examples/fileToJdbc/README.md new file mode 100755 index 0000000..ad9f6b9 --- /dev/null +++ b/examples/fileToJdbc/README.md @@ -0,0 +1,70 @@ +## Sample File to JDBC Example + +This example shows how to read files from HDFS, parse into POJOs and then insert into a table in MySQL. + +Given various parsing demands, we give two applications under this package, `FileToJdbcCsvParser` and `FileToJdbcCustomParser`. + +`CsvParser` allows you to parse only CSV format input files. For more complex input format, `CustomParser` allows you to set custom regex to parse. + +Accordingly, we have two additional configuration files (`src/site/conf/exampleCsvParser.xml` and `src/site/conf/exampleCustomParser.xml`) besides the common properties file (`/src/main/resources/META-INF/properties.xml`). + +Users can choose which applicaiton and which addtional configuration file to use during launch time. + + +####**Update Properties:** + +- Update these common properties in the file `/src/main/resources/META-INF/properties.xml`: + +| Property Name | Description | +| ------------- | ----------- | +| dt.operator.FileReader.prop.directory |HDFS input directory path +|dt.operator.JdbcOutput.prop.store.databaseUrl | database URL of the form `jdbc:mysql://hostName:portNumber/dbName` | +| dt.operator.JdbcOutput.prop.store.userName | MySQL user name | +| dt.operator.JdbcOutput.prop.store.password | MySQL user password | +| dt.operator.JdbcOutput.prop.tablename | MySQL output table name | + +- Using CustomParser: update `regexStr` in file `src/site/conf/exampleCustomParser.xml` + + +####**Sample Input:** + +- To set up MySQL database and create table, check `src/test/resources/example.sql` +- To run this example, create files using this format: + +``` + 1,User1,1000 + 2,User2,2000 + 3,User3,3000 + 4,User4,4000 + 5,User5,5000 + 6,User6,6000 + 7,User7,7000 + 8,User8,8000 + 9,User9,9000 + 10,User10,10000 +``` +- To change input format, update `PojoEvent` class and `addFieldInfos()` method in `src/main/java/com/example/FileToJdbcApp`. If using CsvParser, also update `src/main/resources/schema.json`. + +####**Sample Output:** + +- After running successfully, verify +that the database table has the expected output: + +``` + mysql> select * from table_name; + +------------+--------+--------+ + | ACCOUNT_NO | NAME | AMOUNT | + +------------+--------+--------+ + | 1 | User1 | 1000 | + | 2 | User2 | 2000 | + | 3 | User3 | 3000 | + | 4 | User4 | 4000 | + | 5 | User5 | 5000 | + | 6 | User6 | 6000 | + | 7 | User7 | 7000 | + | 8 | User8 | 8000 | + | 9 | User9 | 9000 | + | 10 | User10 | 10000 | + +------------+--------+--------+ + 10 rows in set (0.00 sec) +``` http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl b/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl new file mode 100755 index 0000000..08075a9 --- /dev/null +++ b/examples/fileToJdbc/XmlJavadocCommentsExtractor.xsl @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> + +<!-- + Document : XmlJavadocCommentsExtractor.xsl + Created on : September 16, 2014, 11:30 AM + Description: + The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. +--> + +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> + <xsl:output method="xml" standalone="yes"/> + + <!-- copy xml by selecting only the following nodes, attributes and text --> + <xsl:template match="node()|text()|@*"> + <xsl:copy> + <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> + </xsl:copy> + </xsl:template> + + <!-- Strip off the following paths from the selected xml --> + <xsl:template match="//root/package/interface/interface + |//root/package/interface/method/@qualified + |//root/package/class/interface + |//root/package/class/class + |//root/package/class/method/@qualified + |//root/package/class/field/@qualified" /> + + <xsl:strip-space elements="*"/> +</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/pom.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/pom.xml b/examples/fileToJdbc/pom.xml new file mode 100755 index 0000000..ae62e0c --- /dev/null +++ b/examples/fileToJdbc/pom.xml @@ -0,0 +1,293 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.example</groupId> + <version>1.0-SNAPSHOT</version> + <artifactId>FileToJdbcApp</artifactId> + <packaging>jar</packaging> + + <!-- change these to the appropriate values --> + <name>File to JDBC</name> + <description>My Apex Application Description</description> + + <properties> + <!-- change this if you desire to use a different version of Apex Core --> + <apex.version>3.5.0</apex.version> + <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> + <malhar.version>3.6.0</malhar.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.9</version> + <configuration> + <downloadSources>true</downloadSources> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <encoding>UTF-8</encoding> + <source>1.7</source> + <target>1.7</target> + <debug>true</debug> + <optimize>false</optimize> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>target/deps</outputDirectory> + <includeScope>runtime</includeScope> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>app-package-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-${project.version}-apexapp</finalName> + <appendAssemblyId>false</appendAssemblyId> + <descriptors> + <descriptor>src/assemble/appPackage.xml</descriptor> + </descriptors> + <archiverConfig> + <defaultDirectoryMode>0755</defaultDirectoryMode> + </archiverConfig> + <archive> + <manifestEntries> + <Class-Path>${apex.apppackage.classpath}</Class-Path> + <DT-Engine-Version>${apex.version}</DT-Engine-Version> + <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> + <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> + <DT-App-Package-Version>${project.version}</DT-App-Package-Version> + <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> + <DT-App-Package-Description>${project.description}</DT-App-Package-Description> + </manifestEntries> + </archive> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <phase>package</phase> + <configuration> + <target> + <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" + tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + <execution> + <!-- create resource directory for xml javadoc--> + <id>createJavadocDirectory</id> + <phase>generate-resources</phase> + <configuration> + <tasks> + <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> + <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + <!-- generate javdoc --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <!-- generate xml javadoc --> + <execution> + <id>xml-doclet</id> + <phase>generate-resources</phase> + <goals> + <goal>javadoc</goal> + </goals> + <configuration> + <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> + <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> + <useStandardDocletOptions>false</useStandardDocletOptions> + <docletArtifact> + <groupId>com.github.markusbernhardt</groupId> + <artifactId>xml-doclet</artifactId> + <version>1.0.4</version> + </docletArtifact> + </configuration> + </execution> + </executions> + </plugin> + <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>xml-maven-plugin</artifactId> + <version>1.0</version> + <executions> + <execution> + <id>transform-xmljavadoc</id> + <phase>generate-resources</phase> + <goals> + <goal>transform</goal> + </goals> + </execution> + </executions> + <configuration> + <transformationSets> + <transformationSet> + <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> + <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> + </transformationSet> + </transformationSets> + </configuration> + </plugin> + <!-- copy xml javadoc to class jar --> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>copy-resources</id> + <phase>process-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/classes</outputDirectory> + <resources> + <resource> + <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + + </build> + + <dependencies> + <!-- add your dependencies here --> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-library</artifactId> + <version>${malhar.version}</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${malhar.version}</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>5.1.36</version> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>2.7.8</version> + </dependency> + <dependency> + <groupId>net.sf.supercsv</groupId> + <artifactId>super-csv</artifactId> + <version>2.4.0</version> + </dependency> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + </dependency> + </dependencies> +</project> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/assemble/appPackage.xml b/examples/fileToJdbc/src/assemble/appPackage.xml new file mode 100755 index 0000000..7ad071c --- /dev/null +++ b/examples/fileToJdbc/src/assemble/appPackage.xml @@ -0,0 +1,43 @@ +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/resources</directory> + <outputDirectory>/resources</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java new file mode 100755 index 0000000..e089255 --- /dev/null +++ b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/CustomParser.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.example.FileToJdbcApp; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Pattern; + +// parse input line into pojo event +public class CustomParser extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(CustomParser.class); + + // default regex pattern for parsing each line + private static final Pattern RegexDefault = Pattern.compile("[\\p{Punct}\\s]+"); + + private String regexStr; // customized configurable regex string + private transient Pattern regexPattern; // compiled regex pattern generated from customized regex string + + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort<PojoEvent> output = new DefaultOutputPort<>(); + + public final transient DefaultInputPort<String> + input = new DefaultInputPort<String>() { + + @Override + public void process(String line) + { + // use custom regex to split line into words + final String[] words = regexPattern.split(line); + + PojoEvent pojo = new PojoEvent(); + // transform words array into pojo event + try { + int accnum = Integer.parseInt(words[0]); + pojo.setAccountNumber(accnum); + } catch (NumberFormatException e) { + LOG.error("Number Format Exception", e); + pojo.setAccountNumber(0); + } + String name = words[1]; + pojo.setName(name); + try { + int amount = Integer.parseInt(words[2]); + pojo.setAmount(amount); + } catch (NumberFormatException e) { + LOG.error("Number Format Exception", e); + pojo.setAmount(0); + } + output.emit(pojo); + } + }; + + public String getRegexStr() { + return this.regexStr; + } + + public void setRegexStr(String regex) { + this.regexStr = regex; + } + + @Override + public void setup(OperatorContext context) + { + if (null == regexStr) { + regexPattern = RegexDefault; + } else { + regexPattern = Pattern.compile(this.getRegexStr()); + } + } + +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java new file mode 100755 index 0000000..201c705 --- /dev/null +++ b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileReader.java @@ -0,0 +1,20 @@ +package com.example.FileToJdbcApp; + +import com.datatorrent.api.DefaultOutputPort; +import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; + +public class FileReader extends LineByLineFileInputOperator{ + + /** + * output in bytes to match CsvParser input type + */ + public final transient DefaultOutputPort<byte[]> byteOutput = new DefaultOutputPort<>(); + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + byteOutput.emit(tuple.getBytes()); + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java new file mode 100755 index 0000000..23d3f36 --- /dev/null +++ b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCsvParser.java @@ -0,0 +1,54 @@ +package com.example.FileToJdbcApp; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.parser.CsvParser; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +import static java.sql.Types.INTEGER; +import static java.sql.Types.VARCHAR; + +@ApplicationAnnotation(name = "FileToJdbcCsvParser") +public class FileToJdbcCsvParser implements StreamingApplication{ + + @Override + public void populateDAG(DAG dag, Configuration configuration) { + // create operators + FileReader fileReader = dag.addOperator("FileReader", FileReader.class); + CsvParser csvParser = dag.addOperator("CsvParser", CsvParser.class); + JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", JdbcPOJOInsertOutputOperator.class); + + // configure operators + String pojoSchema = SchemaUtils.jarResourceFileToString("schema.json"); + csvParser.setSchema(pojoSchema); + + jdbcOutputOperator.setFieldInfos(addFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutputOperator.setStore(outputStore); + + // add stream + dag.addStream("Bytes", fileReader.byteOutput, csvParser.in); + dag.addStream("POJOs", csvParser.out, jdbcOutputOperator.input); + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<JdbcFieldInfo> addFieldInfos() { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", JdbcFieldInfo.SupportType.INTEGER , INTEGER)); + fieldInfos.add(new JdbcFieldInfo("NAME", "name", JdbcFieldInfo.SupportType.STRING, VARCHAR)); + fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); + return fieldInfos; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java new file mode 100755 index 0000000..c13377f --- /dev/null +++ b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/FileToJdbcCustomParser.java @@ -0,0 +1,50 @@ +package com.example.FileToJdbcApp; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +import static java.sql.Types.INTEGER; +import static java.sql.Types.VARCHAR; + +@ApplicationAnnotation(name = "FileToJdbcCustomParser") +public class FileToJdbcCustomParser implements StreamingApplication{ + + @Override + public void populateDAG(DAG dag, Configuration configuration) { + // create operators + FileReader fileReader = dag.addOperator("FileReader", FileReader.class); + CustomParser customParser = dag.addOperator("CustomParser", CustomParser.class); + JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", JdbcPOJOInsertOutputOperator.class); + + // configure operators + jdbcOutputOperator.setFieldInfos(addFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutputOperator.setStore(outputStore); + + // add stream + dag.addStream("Data", fileReader.output, customParser.input); + dag.addStream("POJOs", customParser.output, jdbcOutputOperator.input); + } + + /** + * This method can be modified to have field mappings based on used defined + * class + */ + private List<JdbcFieldInfo> addFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", JdbcFieldInfo.SupportType.INTEGER , INTEGER)); + fieldInfos.add(new JdbcFieldInfo("NAME", "name", JdbcFieldInfo.SupportType.STRING, VARCHAR)); + fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); + return fieldInfos; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java new file mode 100755 index 0000000..7985b45 --- /dev/null +++ b/examples/fileToJdbc/src/main/java/com/example/FileToJdbcApp/PojoEvent.java @@ -0,0 +1,45 @@ +package com.example.FileToJdbcApp; + +public class PojoEvent +{ + @Override + public String toString() + { + return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; + } + + private int accountNumber; + private String name; + private int amount; + + public int getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(int accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAmount() + { + return amount; + } + + public void setAmount(int amount) + { + this.amount = amount; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/resources/META-INF/properties.xml b/examples/fileToJdbc/src/main/resources/META-INF/properties.xml new file mode 100755 index 0000000..4f706c4 --- /dev/null +++ b/examples/fileToJdbc/src/main/resources/META-INF/properties.xml @@ -0,0 +1,48 @@ +<?xml version="1.0"?> +<configuration> + <property> + <name>dt.operator.JdbcOutput.prop.store.databaseDriver</name> + <value>com.mysql.jdbc.Driver</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.databaseUrl</name> + <value>jdbc:mysql://hostName:portNumber/dbName</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.userName</name> + <value>root</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.password</name> + <value>password</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.batchSize</name> + <value>5</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.tablename</name> + <value>table_name</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.port.input.attr.TUPLE_CLASS</name> + <value>com.example.FileToJdbcApp.PojoEvent</value> + </property> + + <property> + <name>dt.operator.FileReader.prop.directory</name> + <value>input_directory</value> + </property> + + <property> + <name>dt.loggers.level</name> + <value>com.datatorrent.*:INFO,org.apache.*:INFO</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/main/resources/schema.json ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/main/resources/schema.json b/examples/fileToJdbc/src/main/resources/schema.json new file mode 100755 index 0000000..3c191cf --- /dev/null +++ b/examples/fileToJdbc/src/main/resources/schema.json @@ -0,0 +1,19 @@ +{ + "separator": ",", + "quoteChar":"\"", + "fields": [ + { + "name": "AccountNumber", + "type": "INTEGER" + }, + { + "name": "Name", + "type": "String" + }, + { + "name": "Amount", + "type": "INTEGER" + } + ] +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java b/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java new file mode 100755 index 0000000..806bf2e --- /dev/null +++ b/examples/fileToJdbc/src/test/java/com/example/FileToJdbcApp/ApplicationTest.java @@ -0,0 +1,124 @@ +package com.example.FileToJdbcApp; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.netlet.util.DTThrowable; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.validation.ConstraintViolationException; +import java.io.File; +import java.io.IOException; +import java.sql.*; + +/** + * Test the DAG declaration in local mode.<br> + * The assumption to run this test case is that test_jdbc_table + * and meta-table are created already. + */ +public class ApplicationTest { + private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + private static final String TABLE_NAME = "test_jdbc_table"; + + @BeforeClass + public static void setup() { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; + stmt.executeUpdate(createMetaTable); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + + } catch (Throwable e) { + DTThrowable.rethrow(e); + } + } + + public static void cleanTable() + { + try { + Connection con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void testCsvParserApp() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(new File("src/test/resources/test.xml").toURI().toURL()); + + lma.prepareDAG(new FileToJdbcCsvParser(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); // test will terminate after results are available + + // wait for records to be added to table + Thread.sleep(5000); + + Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); + cleanTable(); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + @Test + public void testCustomParserApp() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(new File("src/test/resources/test.xml").toURI().toURL()); + + lma.prepareDAG(new FileToJdbcCustomParser(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); // test will terminate after results are available + + // wait for records to be added to table + Thread.sleep(5000); + + Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); + cleanTable(); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/test/resources/example.sql ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/example.sql b/examples/fileToJdbc/src/test/resources/example.sql new file mode 100644 index 0000000..4461247 --- /dev/null +++ b/examples/fileToJdbc/src/test/resources/example.sql @@ -0,0 +1,8 @@ +CREATE DATABASE IF NOT EXISTS testJdbc; + +USE testJdbc; + +CREATE TABLE IF NOT EXISTS `test_jdbc_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255), + `AMOUNT` int(11)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/log4j.properties b/examples/fileToJdbc/src/test/resources/log4j.properties new file mode 100755 index 0000000..3bfcdc5 --- /dev/null +++ b/examples/fileToJdbc/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/test/resources/test-input/sample.txt ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/test-input/sample.txt b/examples/fileToJdbc/src/test/resources/test-input/sample.txt new file mode 100644 index 0000000..362253e --- /dev/null +++ b/examples/fileToJdbc/src/test/resources/test-input/sample.txt @@ -0,0 +1,10 @@ +1,User1,1000 +2,User2,2000 +3,User3,3000 +4,User4,4000 +5,User5,5000 +6,User6,6000 +7,User7,7000 +8,User8,8000 +9,User9,9000 +10,User10,10000 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1b7155c/examples/fileToJdbc/src/test/resources/test.xml ---------------------------------------------------------------------- diff --git a/examples/fileToJdbc/src/test/resources/test.xml b/examples/fileToJdbc/src/test/resources/test.xml new file mode 100755 index 0000000..c3a49c4 --- /dev/null +++ b/examples/fileToJdbc/src/test/resources/test.xml @@ -0,0 +1,58 @@ +<?xml version="1.0"?> +<configuration> + <property> + <name>dt.operator.JdbcOutput.prop.store.databaseDriver</name> + <value>org.hsqldb.jdbcDriver</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.databaseUrl</name> + <value>jdbc:hsqldb:mem:test;sql.syntax_mys=true</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.userName</name> + <value>sa</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.store.password</name> + <value></value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.batchSize</name> + <value>5</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.prop.tablename</name> + <value>test_jdbc_table</value> + </property> + + <property> + <name>dt.operator.JdbcOutput.port.input.attr.TUPLE_CLASS</name> + <value>com.example.FileToJdbcApp.PojoEvent</value> + </property> + + <property> + <name>dt.operator.FileReader.prop.directory</name> + <value>src/test/resources/test-input</value> + </property> + + <property> + <name>dt.loggers.level</name> + <value>com.datatorrent.*:INFO,org.apache.*:INFO</value> + </property> + + <property> + <name>dt.application.FileToJdbcCsvParser.operator.CsvParser.port.out.attr.TUPLE_CLASS</name> + <value>com.example.FileToJdbcApp.PojoEvent</value> + </property> + + <property> + <name>dt.application.FileToJdbcCustomParser.operator.CustomParser.prop.regexStr</name> + <value>,</value> + </property> +</configuration> +
