Grouping examples and POM and Readme changes.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c79def4c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c79def4c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c79def4c Branch: refs/heads/master Commit: c79def4cd6f2bfb830aa32c9a9e455a7c4eb8385 Parents: 24027ed Author: Lakshmi Prasanna Velineni <[email protected]> Authored: Wed Mar 29 12:46:59 2017 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Mon Apr 24 09:06:40 2017 -0700 ---------------------------------------------------------------------- examples/hdfs2kafka/README.md | 4 - .../hdfs2kafka/XmlJavadocCommentsExtractor.xsl | 44 --- examples/hdfs2kafka/pom.xml | 315 ------------------- examples/hdfs2kafka/src/assemble/appPackage.xml | 43 --- .../java/com/example/myapexapp/Application.java | 26 -- .../src/main/resources/META-INF/properties.xml | 16 - .../com/example/myapexapp/ApplicationTest.java | 132 -------- .../src/test/resources/log4j.properties | 22 -- examples/kafka/README.md | 10 + examples/kafka/XmlJavadocCommentsExtractor.xsl | 44 --- examples/kafka/pom.xml | 305 ++---------------- .../java/com/example/myapexapp/KafkaApp.java | 26 -- .../example/myapexapp/LineOutputOperator.java | 34 -- .../examples/kafka/hdfs2kafka/Application.java | 26 ++ .../examples/kafka/kafka2hdfs/KafkaApp.java | 26 ++ .../kafka/kafka2hdfs/LineOutputOperator.java | 34 ++ .../META-INF/properties-hdfs2kafka.xml | 16 + .../com/example/myapexapp/ApplicationTest.java | 152 --------- .../kafka/hdfs2kafka/ApplicationTest.java | 125 ++++++++ .../kafka/kafka2hdfs/ApplicationTest.java | 150 +++++++++ examples/pom.xml | 1 + pom.xml | 2 +- 22 files changed, 420 insertions(+), 1133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/README.md ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/README.md b/examples/hdfs2kafka/README.md deleted file mode 100644 index 166abd3..0000000 --- a/examples/hdfs2kafka/README.md +++ /dev/null @@ -1,4 +0,0 @@ -This sample application shows how to read lines from files in HDFS and write -them out to a Kafka topic. Each line of the input file is considered a separate -message. The topic name, the name of the directory that is monitored for input -files, and other parameters are configurable in `META_INF/properties.xml`. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl b/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl deleted file mode 100644 index 08075a9..0000000 --- a/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl +++ /dev/null @@ -1,44 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> - -<!-- - Document : XmlJavadocCommentsExtractor.xsl - Created on : September 16, 2014, 11:30 AM - Description: - The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. ---> - -<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> - <xsl:output method="xml" standalone="yes"/> - - <!-- copy xml by selecting only the following nodes, attributes and text --> - <xsl:template match="node()|text()|@*"> - <xsl:copy> - <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> - </xsl:copy> - </xsl:template> - - <!-- Strip off the following paths from the selected xml --> - <xsl:template match="//root/package/interface/interface - |//root/package/interface/method/@qualified - |//root/package/class/interface - |//root/package/class/class - |//root/package/class/method/@qualified - |//root/package/class/field/@qualified" /> - - <xsl:strip-space elements="*"/> -</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/pom.xml ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/pom.xml b/examples/hdfs2kafka/pom.xml deleted file mode 100644 index 75cfb6d..0000000 --- a/examples/hdfs2kafka/pom.xml +++ /dev/null @@ -1,315 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>hdfsToKafka</artifactId> - <packaging>jar</packaging> - - <!-- change these to the appropriate values --> - <name>HDFS to Kafka</name> - <description>Simple application to transfer data from HDFS to Kafka</description> - - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <malhar.version>3.6.0</malhar.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>3.4.0</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <!-- - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - --> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-contrib</artifactId> - <version>3.4.0</version> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.8.2</artifactId> - <version>0.8.1</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>info.batey.kafka</groupId> - <artifactId>kafka-unit</artifactId> - <version>0.3</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/src/assemble/appPackage.xml b/examples/hdfs2kafka/src/assemble/appPackage.xml deleted file mode 100644 index 7ad071c..0000000 --- a/examples/hdfs2kafka/src/assemble/appPackage.xml +++ /dev/null @@ -1,43 +0,0 @@ -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/resources</directory> - <outputDirectory>/resources</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java b/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java deleted file mode 100644 index 447ae1c..0000000 --- a/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.example.myapexapp; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator; -import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; - -@ApplicationAnnotation(name="Hdfs2Kafka") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - LineByLineFileInputOperator in = dag.addOperator("lines", - LineByLineFileInputOperator.class); - - KafkaSinglePortOutputOperator<String,String> out = dag.addOperator("kafkaOutput", new KafkaSinglePortOutputOperator<String,String>()); - - dag.addStream("data", in.output, out.inputPort).setLocality(Locality.CONTAINER_LOCAL); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml b/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 7c624ca..0000000 --- a/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,16 +0,0 @@ -<?xml version="1.0"?> -<configuration> - <property> - <name>dt.operator.kafkaOutput.prop.topic</name> - <value>hdfs2kafka</value> - </property> - <property> - <name>dt.operator.lines.prop.directory</name> - <value>/tmp/hdfs2kafka</value> - </property> - <property> - <name>dt.operator.kafkaOutput.prop.producerProperties</name> - <value>serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=localhost:9092</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java deleted file mode 100644 index 2c415be..0000000 --- a/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.example.myapexapp; - -import java.io.File; -import java.io.IOException; - -import java.util.List; -import java.util.concurrent.TimeoutException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; - -import org.junit.Test; - -import info.batey.kafka.unit.KafkaUnitRule; -import info.batey.kafka.unit.KafkaUnit; - -import kafka.producer.KeyedMessage; - -import com.datatorrent.api.LocalMode; -import com.example.myapexapp.Application; - -import static org.junit.Assert.assertTrue; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); - private static final String TOPIC = "hdfs2kafka"; - private static final String directory = "target/hdfs2kafka"; - private static final String FILE_NAME = "messages.txt"; - - private static final int zkPort = 2181; - private static final int brokerPort = 9092; - private static final String BROKER = "localhost:" + brokerPort; - //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part - - // test messages - private static String[] lines = - { - "1st line", - "2nd line", - "3rd line", - "4th line", - "5th line", - }; - - // broker port must match properties.xml - @Rule - public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort); - - - @Test - public void testApplication() throws IOException, Exception { - try { - // create file in monitored HDFS directory - createFile(); - - // run app asynchronously; terminate after results are checked - LocalMode.Controller lc = asyncRun(); - - // get messages from Kafka topic and compare with input - chkOutput(); - - lc.shutdown(); - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - - // create a file with content from 'lines' - private void createFile() throws IOException { - // remove old file and create new one - File file = new File(directory, FILE_NAME); - FileUtils.deleteQuietly(file); - try { - String data = StringUtils.join(lines, "\n") + "\n"; // add final newline - FileUtils.writeStringToFile(file, data, "UTF-8"); - } catch (IOException e) { - LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory); - e.printStackTrace(); - } - LOG.debug("Created file {} with {} lines in {}", - FILE_NAME, lines.length, directory); - } - - private LocalMode.Controller asyncRun() throws Exception { - Configuration conf = getConfig(); - LocalMode lma = LocalMode.newInstance(); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - return lc; - } - - private Configuration getConfig() { - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - conf.set("dt.operator.lines.prop.directory", directory); - return conf; - } - - private void chkOutput() throws Exception { - KafkaUnit ku = kafkaUnitRule.getKafkaUnit(); - List<String> messages = null; - - // wait for messages to appear in kafka - Thread.sleep(10000); - - try { - messages = ku.readMessages(TOPIC, lines.length); - } catch (Exception e) { - LOG.error("Error: Got exception {}", e); - } - - int i = 0; - for (String msg : messages) { - assertTrue("Error: message mismatch", msg.equals(lines[i])); - ++i; - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/hdfs2kafka/src/test/resources/log4j.properties b/examples/hdfs2kafka/src/test/resources/log4j.properties deleted file mode 100644 index 98544e8..0000000 --- a/examples/hdfs2kafka/src/test/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug -log4j.logger.org=info http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/README.md ---------------------------------------------------------------------- diff --git a/examples/kafka/README.md b/examples/kafka/README.md index 1cecdaa..1a7a9c4 100644 --- a/examples/kafka/README.md +++ b/examples/kafka/README.md @@ -1,6 +1,16 @@ +## Kafka to HDFS example : + This sample application show how to read lines from a Kafka topic using the new (0.9) Kafka input operator and write them out to HDFS using rolling files with a bounded size. The output files start out with a `.tmp` extension and get renamed when they reach the size bound. Additional operators to perform parsing, aggregation or filtering can be inserted into this pipeline as needed. + +## HDFS to Kafka example : + +This sample application shows how to read lines from files in HDFS and write +them out to a Kafka topic. Each line of the input file is considered a separate +message. The topic name, the name of the directory that is monitored for input +files, and other parameters are configurable in `META_INF/properties-hdfs2kafka.xml`. + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/kafka/XmlJavadocCommentsExtractor.xsl b/examples/kafka/XmlJavadocCommentsExtractor.xsl deleted file mode 100644 index 08075a9..0000000 --- a/examples/kafka/XmlJavadocCommentsExtractor.xsl +++ /dev/null @@ -1,44 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> - -<!-- - Document : XmlJavadocCommentsExtractor.xsl - Created on : September 16, 2014, 11:30 AM - Description: - The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. ---> - -<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> - <xsl:output method="xml" standalone="yes"/> - - <!-- copy xml by selecting only the following nodes, attributes and text --> - <xsl:template match="node()|text()|@*"> - <xsl:copy> - <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> - </xsl:copy> - </xsl:template> - - <!-- Strip off the following paths from the selected xml --> - <xsl:template match="//root/package/interface/interface - |//root/package/interface/method/@qualified - |//root/package/class/interface - |//root/package/class/class - |//root/package/class/method/@qualified - |//root/package/class/field/@qualified" /> - - <xsl:strip-space elements="*"/> -</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml index ce325bf..c21bb20 100644 --- a/examples/kafka/pom.xml +++ b/examples/kafka/pom.xml @@ -2,258 +2,39 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>kafka2hdfs</artifactId> - <packaging>jar</packaging> - - <!-- change these to the appropriate values --> - <name>New Kafka Input Operator</name> - <description>Example Use of New Kafka Input Operator</description> - - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <malhar.version>3.6.0</malhar.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.8.0-SNAPSHOT</version> + </parent> - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> + <artifactId>malhar-examples-kafka</artifactId> + <packaging>jar</packaging> + <name>Apache Apex Malhar Kafka examples</name> + <description> + kafka2hdfs is a example show how to read lines from a Kafka topic using the new (0.9) + Kafka input operator and write them out to HDFS. + hdfs2kafka is a simple application to transfer data from HDFS to Kafka + </description> + <dependencies> <!-- add your dependencies here --> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-kafka</artifactId> - <version>${malhar.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - </exclusion> - </exclusions> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> + <artifactId>apex-engine</artifactId> + <version>${apex.core.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>info.batey.kafka</groupId> + <artifactId>kafka-unit</artifactId> + <version>0.4</version> <exclusions> <exclusion> <groupId>*</groupId> @@ -261,37 +42,10 @@ </exclusion> </exclusions> </dependency> - <dependency> <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>0.9.0.1</version> - </dependency> - - <dependency> - <groupId>info.batey.kafka</groupId> - <artifactId>kafka-unit</artifactId> - <version>0.4</version> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> <exclusions> <exclusion> <groupId>*</groupId> @@ -299,9 +53,12 @@ </exclusion> </exclusions> </dependency> - - - + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.9.0.0</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java b/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java deleted file mode 100644 index 09089eb..0000000 --- a/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.example.myapexapp; - -import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator; -import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -@ApplicationAnnotation(name="Kafka2HDFS") -public class KafkaApp implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - KafkaSinglePortInputOperator in - = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator()); - - in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); - LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator()); - - dag.addStream("data", in.outputPort, out.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java b/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java deleted file mode 100644 index 2b184c6..0000000 --- a/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.example.myapexapp; - -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; - -/** - * Converts each tuple to a string and writes it as a new line to the output file - */ -public class LineOutputOperator extends AbstractFileOutputOperator<byte[]> -{ - private static final String NL = System.lineSeparator(); - private static final Charset CS = StandardCharsets.UTF_8; - - @NotNull - private String baseName; - - @Override - public byte[] getBytesForTuple(byte[] t) { - String result = new String(t, CS) + NL; - return result.getBytes(CS); - } - - @Override - protected String getFileName(byte[] tuple) { - return baseName; - } - - public String getBaseName() { return baseName; } - public void setBaseName(String v) { baseName = v; } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java new file mode 100644 index 0000000..646c8e8 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java @@ -0,0 +1,26 @@ +package org.apache.apex.examples.kafka.hdfs2kafka; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator; +import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; + +@ApplicationAnnotation(name="Hdfs2Kafka") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + LineByLineFileInputOperator in = dag.addOperator("lines", + LineByLineFileInputOperator.class); + + KafkaSinglePortOutputOperator<String,String> out = dag.addOperator("kafkaOutput", new KafkaSinglePortOutputOperator<String,String>()); + + dag.addStream("data", in.output, out.inputPort).setLocality(Locality.CONTAINER_LOCAL); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java new file mode 100644 index 0000000..15f0182 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java @@ -0,0 +1,26 @@ +package org.apache.apex.examples.kafka.kafka2hdfs; + +import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator; +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name="Kafka2HDFS") +public class KafkaApp implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + KafkaSinglePortInputOperator in + = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator()); + + in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); + LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator()); + + dag.addStream("data", in.outputPort, out.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java new file mode 100644 index 0000000..ef40a69 --- /dev/null +++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java @@ -0,0 +1,34 @@ +package org.apache.apex.examples.kafka.kafka2hdfs; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Converts each tuple to a string and writes it as a new line to the output file + */ +public class LineOutputOperator extends AbstractFileOutputOperator<byte[]> +{ + private static final String NL = System.lineSeparator(); + private static final Charset CS = StandardCharsets.UTF_8; + + @NotNull + private String baseName; + + @Override + public byte[] getBytesForTuple(byte[] t) { + String result = new String(t, CS) + NL; + return result.getBytes(CS); + } + + @Override + protected String getFileName(byte[] tuple) { + return baseName; + } + + public String getBaseName() { return baseName; } + public void setBaseName(String v) { baseName = v; } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml ---------------------------------------------------------------------- diff --git a/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml new file mode 100644 index 0000000..7c624ca --- /dev/null +++ b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<configuration> + <property> + <name>dt.operator.kafkaOutput.prop.topic</name> + <value>hdfs2kafka</value> + </property> + <property> + <name>dt.operator.lines.prop.directory</name> + <value>/tmp/hdfs2kafka</value> + </property> + <property> + <name>dt.operator.kafkaOutput.prop.producerProperties</name> + <value>serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=localhost:9092</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java deleted file mode 100644 index 635d25a..0000000 --- a/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.example.myapexapp; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; - -import java.util.ArrayList; - -import javax.validation.ConstraintViolationException; - -import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator; -import org.apache.hadoop.conf.Configuration; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import info.batey.kafka.unit.KafkaUnitRule; -import info.batey.kafka.unit.KafkaUnit; - -import kafka.producer.KeyedMessage; - -import com.datatorrent.api.LocalMode; - -import static org.junit.Assert.assertTrue; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); - private static final String TOPIC = "kafka2hdfs"; - - private static final int zkPort = 2181; - private static final int brokerPort = 9092; - private static final String BROKER = "localhost:" + brokerPort; - private static final String FILE_NAME = "test"; - private static final String FILE_DIR = "/tmp/FromKafka"; - private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part - - // test messages - private static String[] lines = - { - "1st line", - "2nd line", - "3rd line", - "4th line", - "5th line", - }; - - // broker port must match properties.xml - @Rule - public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort); - - @Test - public void testApplication() throws Exception { - try { - // delete output file if it exists - File file = new File(FILE_PATH); - file.delete(); - - // write messages to Kafka topic - writeToTopic(); - - // run app asynchronously; terminate after results are checked - LocalMode.Controller lc = asyncRun(); - - // check for presence of output file - chkOutput(); - - // compare output lines to input - compare(); - - lc.shutdown(); - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - - private void writeToTopic() { - KafkaUnit ku = kafkaUnitRule.getKafkaUnit(); - ku.createTopic(TOPIC); - for (String line : lines) { - KeyedMessage<String, String> kMsg = new KeyedMessage<>(TOPIC, line); - ku.sendMessages(kMsg); - } - LOG.debug("Sent messages to topic {}", TOPIC); - } - - private Configuration getConfig() { - Configuration conf = new Configuration(false); - String pre = "dt.operator.kafkaIn.prop."; - conf.setEnum(pre + "initialOffset", - AbstractKafkaInputOperator.InitialOffset.EARLIEST); - conf.setInt(pre + "initialPartitionCount", 1); - conf.set( pre + "topics", TOPIC); - conf.set( pre + "clusters", BROKER); - - pre = "dt.operator.fileOut.prop."; - conf.set( pre + "filePath", FILE_DIR); - conf.set( pre + "baseName", FILE_NAME); - conf.setInt(pre + "maxLength", 40); - conf.setInt(pre + "rotationWindows", 3); - - return conf; - } - - private LocalMode.Controller asyncRun() throws Exception { - Configuration conf = getConfig(); - LocalMode lma = LocalMode.newInstance(); - lma.prepareDAG(new KafkaApp(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - return lc; - } - - private static void chkOutput() throws Exception { - File file = new File(FILE_PATH); - final int MAX = 60; - for (int i = 0; i < MAX && (! file.exists()); ++i ) { - LOG.debug("Sleeping, i = {}", i); - Thread.sleep(1000); - } - if (! file.exists()) { - String msg = String.format("Error: %s not found after %d seconds%n", FILE_PATH, MAX); - throw new RuntimeException(msg); - } - } - - private static void compare() throws Exception { - // read output file - File file = new File(FILE_PATH); - BufferedReader br = new BufferedReader(new FileReader(file)); - ArrayList<String> list = new ArrayList<>(); - String line; - while (null != (line = br.readLine())) { - list.add(line); - } - br.close(); - - // compare - Assert.assertEquals("number of lines", list.size(), lines.length); - for (int i = 0; i < lines.length; ++i) { - assertTrue("line", lines[i].equals(list.get(i))); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java new file mode 100644 index 0000000..aa63ee5 --- /dev/null +++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java @@ -0,0 +1,125 @@ +package org.apache.apex.examples.kafka.hdfs2kafka; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +import info.batey.kafka.unit.KafkaUnit; +import info.batey.kafka.unit.KafkaUnitRule; + +import static org.junit.Assert.assertTrue; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); + private static final String TOPIC = "hdfs2kafka"; + private static final String directory = "target/hdfs2kafka"; + private static final String FILE_NAME = "messages.txt"; + + private static final int zkPort = 2181; + private static final int brokerPort = 9092; + private static final String BROKER = "localhost:" + brokerPort; + //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part + + // test messages + private static String[] lines = + { + "1st line", + "2nd line", + "3rd line", + "4th line", + "5th line", + }; + + // broker port must match properties.xml + @Rule + public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort); + + + @Test + public void testApplication() throws IOException, Exception { + try { + // create file in monitored HDFS directory + createFile(); + + // run app asynchronously; terminate after results are checked + LocalMode.Controller lc = asyncRun(); + + // get messages from Kafka topic and compare with input + chkOutput(); + + lc.shutdown(); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + // create a file with content from 'lines' + private void createFile() throws IOException { + // remove old file and create new one + File file = new File(directory, FILE_NAME); + FileUtils.deleteQuietly(file); + try { + String data = StringUtils.join(lines, "\n") + "\n"; // add final newline + FileUtils.writeStringToFile(file, data, "UTF-8"); + } catch (IOException e) { + LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory); + e.printStackTrace(); + } + LOG.debug("Created file {} with {} lines in {}", + FILE_NAME, lines.length, directory); + } + + private LocalMode.Controller asyncRun() throws Exception { + Configuration conf = getConfig(); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + return lc; + } + + private Configuration getConfig() { + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-hdfs2kafka.xml")); + conf.set("dt.operator.lines.prop.directory", directory); + return conf; + } + + private void chkOutput() throws Exception { + KafkaUnit ku = kafkaUnitRule.getKafkaUnit(); + List<String> messages = null; + + // wait for messages to appear in kafka + Thread.sleep(10000); + + try { + messages = ku.readMessages(TOPIC, lines.length); + } catch (Exception e) { + LOG.error("Error: Got exception {}", e); + } + + int i = 0; + for (String msg : messages) { + assertTrue("Error: message mismatch", msg.equals(lines[i])); + ++i; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java new file mode 100644 index 0000000..80d84fa --- /dev/null +++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java @@ -0,0 +1,150 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.kafka.kafka2hdfs; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.util.ArrayList; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +import info.batey.kafka.unit.KafkaUnit; +import info.batey.kafka.unit.KafkaUnitRule; +import kafka.producer.KeyedMessage; + +import static org.junit.Assert.assertTrue; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); + private static final String TOPIC = "kafka2hdfs"; + + private static final int zkPort = 2181; + private static final int brokerPort = 9092; + private static final String BROKER = "localhost:" + brokerPort; + private static final String FILE_NAME = "test"; + private static final String FILE_DIR = "/tmp/FromKafka"; + private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part + + // test messages + private static String[] lines = + { + "1st line", + "2nd line", + "3rd line", + "4th line", + "5th line", + }; + + // broker port must match properties.xml + @Rule + public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort); + + @Test + public void testApplication() throws Exception { + try { + // delete output file if it exists + File file = new File(FILE_PATH); + file.delete(); + + // write messages to Kafka topic + writeToTopic(); + + // run app asynchronously; terminate after results are checked + LocalMode.Controller lc = asyncRun(); + + // check for presence of output file + chkOutput(); + + // compare output lines to input + compare(); + + lc.shutdown(); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + private void writeToTopic() { + KafkaUnit ku = kafkaUnitRule.getKafkaUnit(); + ku.createTopic(TOPIC); + for (String line : lines) { + KeyedMessage<String, String> kMsg = new KeyedMessage<>(TOPIC, line); + ku.sendMessages(kMsg); + } + LOG.debug("Sent messages to topic {}", TOPIC); + } + + private Configuration getConfig() { + Configuration conf = new Configuration(false); + String pre = "dt.operator.kafkaIn.prop."; + conf.setEnum(pre + "initialOffset", + AbstractKafkaInputOperator.InitialOffset.EARLIEST); + conf.setInt(pre + "initialPartitionCount", 1); + conf.set( pre + "topics", TOPIC); + conf.set( pre + "clusters", BROKER); + + pre = "dt.operator.fileOut.prop."; + conf.set( pre + "filePath", FILE_DIR); + conf.set( pre + "baseName", FILE_NAME); + conf.setInt(pre + "maxLength", 40); + conf.setInt(pre + "rotationWindows", 3); + + return conf; + } + + private LocalMode.Controller asyncRun() throws Exception { + Configuration conf = getConfig(); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new KafkaApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + return lc; + } + + private static void chkOutput() throws Exception { + File file = new File(FILE_PATH); + final int MAX = 60; + for (int i = 0; i < MAX && (! file.exists()); ++i ) { + LOG.debug("Sleeping, i = {}", i); + Thread.sleep(1000); + } + if (! file.exists()) { + String msg = String.format("Error: %s not found after %d seconds%n", FILE_PATH, MAX); + throw new RuntimeException(msg); + } + } + + private static void compare() throws Exception { + // read output file + File file = new File(FILE_PATH); + BufferedReader br = new BufferedReader(new FileReader(file)); + ArrayList<String> list = new ArrayList<>(); + String line; + while (null != (line = br.readLine())) { + list.add(line); + } + br.close(); + + // compare + Assert.assertEquals("number of lines", list.size(), lines.length); + for (int i = 0; i < lines.length; ++i) { + assertTrue("line", lines[i].equals(list.get(i))); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 8412c8f..16cfe26 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -197,6 +197,7 @@ <module>recordReader</module> <module>throttle</module> <module>transform</module> + <module>kafka</module> </modules> <dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9bccc18..5b531ea 100644 --- a/pom.xml +++ b/pom.xml @@ -196,7 +196,6 @@ <profile> <id>all-modules</id> <modules> - <module>kafka</module> <module>hive</module> <module>stream</module> <module>benchmark</module> @@ -210,6 +209,7 @@ <modules> <module>library</module> <module>contrib</module> + <module>kafka</module> <module>examples</module> </modules>
