Repository: apex-malhar Updated Branches: refs/heads/master 3a3629841 -> c3f86f237
APEXMALHAR-2463 FTP input operator sample app, and documentation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c3f86f23 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c3f86f23 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c3f86f23 Branch: refs/heads/master Commit: c3f86f237f6c1253964065a2a309947585a206e4 Parents: 3a36298 Author: francisf <[email protected]> Authored: Thu Mar 30 09:57:00 2017 +0530 Committer: francisf <[email protected]> Committed: Tue May 16 11:48:39 2017 +0530 ---------------------------------------------------------------------- docs/operators/ftpInputOperator.md | 72 +++++++++++++++++++ .../images/ftpInputOperator/classdiagram.png | Bin 0 -> 27557 bytes examples/ftp/README.md | 37 ++++++++++ examples/ftp/pom.xml | 37 ++++++++++ examples/ftp/src/assemble/appPackage.xml | 59 +++++++++++++++ .../apache/apex/examples/ftp/Application.java | 58 +++++++++++++++ .../apache/apex/examples/ftp/package-info.java | 19 +++++ .../src/main/resources/META-INF/properties.xml | 48 +++++++++++++ .../ftp/src/test/resources/log4j.properties | 43 +++++++++++ examples/pom.xml | 1 + mkdocs.yml | 1 + 11 files changed, 375 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/docs/operators/ftpInputOperator.md ---------------------------------------------------------------------- diff --git a/docs/operators/ftpInputOperator.md b/docs/operators/ftpInputOperator.md new file mode 100644 index 0000000..e6f39f7 --- /dev/null +++ b/docs/operators/ftpInputOperator.md @@ -0,0 +1,72 @@ +FTP Input Operator +============= + +## Operator Objective +This operator(`AbstractFTPInputOperator`) is designed to scan a directory from an FTP server for files, read +and split file content into tuples such as lines or blocks of bytes, and finally +emit them on the output port for further processing by downstream operators. +The operator extends the `AbstractFileInputOperator`. It overrides the +getFSInstance() method and returns an instance of the FTPFileSystem +(`org.apache.hadoop.fs.ftp.FTPFileSystem`) + +## Class Diagram + + +## Operator Information +1. Operator location : **_malhar-lib_** +2. Available since : **_2.0.0_** +3. Java Package : [com.datatorrent.lib.io](https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java) + +### Ports +Because this is an input operator, there are no input ports. + + +| Port | Description | Type | Mandatory | +| ------- | -------- | ----- | ------- | +| *output* | output port on which data is emitted | String | Yes | + + +### Configuration + +| Property | Description | Type | Mandatory | Default Value | +| ------- | ------- | ------- | ------- | ------- | +| *host* | the hostname of the FTP Server | String | Yes | N/A | +| *source* | the directory path from where to scan and read files | String | Yes | N/A | +| *username* | the username for authenticating against the FTP server. This is an optional property and can be skipped when anonymous FTP is enabled | String | Yes | N/A | +| *password* | the password to be used in conjunction with the above username | String | Yes | N/A | + +## Partitioning +#### Static Partitioning +Configure parameter `partitionCount` to define the desired number of initial partitions +(4 in this example). + +```xml +<property> + <name>dt.operator.{OperatorName}.prop.partitionCount</name> + <value>4</value> +</property> +``` +Alternatively, this can be changed in the application code by setting the operator property `partitionCount` to the desired number of partitions: + +```java +FTPStringInputOperator reader = dag.addOperator("Reader", new FTPStringInputOperator()); +reader.setPartitionCount(4); +``` + +#### Dynamic Partitioning +Dynamic partitioning -- changing the number of partitions of one or more operators +in a running application -- can be achieved in multiple ways: +- Use the command line tool `apex` or the UI console to change the value of the + `partitionCount` property of the running operator. This change is detected in + `processStats()` (which is invoked periodically by the platform) where, if the + current partition count (`currentPartitions`) and the desired partition count + (`partitionCount`) differ, the `repartitionRequired` flag in the response is set. + This causes the platform to invoke `definePartitions()` to create a new set of + partitions with the desired count. +- Override `processStats()` and within it, based on the statistics in the + incoming parameter or any other factors, define a new desired value of + `partitionCount` and finally, if this value differs from the current partition + count, set the `repartitionRequired` flag in the response. + +### Example application +An example application for the FTP input operator can be found at [https://github.com/apache/apex-malhar/tree/master/examples/ftp](https://github.com/apache/apex-malhar/tree/master/examples/ftp) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/docs/operators/images/ftpInputOperator/classdiagram.png ---------------------------------------------------------------------- diff --git a/docs/operators/images/ftpInputOperator/classdiagram.png b/docs/operators/images/ftpInputOperator/classdiagram.png new file mode 100644 index 0000000..8e173a8 Binary files /dev/null and b/docs/operators/images/ftpInputOperator/classdiagram.png differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/README.md ---------------------------------------------------------------------- diff --git a/examples/ftp/README.md b/examples/ftp/README.md new file mode 100644 index 0000000..d4aca6a --- /dev/null +++ b/examples/ftp/README.md @@ -0,0 +1,37 @@ +## FTP Input Operator Example + +This example shows how to use `FTPStringInputOperator` which is inherited from the `AbstractFTPInputOperator`. The `FTPStringInputOperator` scans a directory from an FTP server for files, reads lines from the files and +emits them on the output port for further processing. The tuples emitted by the `FTPStringInputOperator` are processed by the downstream operator `StringFileOutputOperator` which writes them to hdfs. + +The properties file `META-INF/properties.xml` shows how to configure the respective operators. + + +Users can choose the application and additional configuration file to use during launch time. In this example, we use the files mentioned above to configure the operator properties. + + +#### **Update Properties from properties.xml - This is needed to run the example:** + +- Update these common properties in the file `/src/main/resources/META-INF/properties.xml`: + +| Property Name | Description | +| ------------- | ----------- | +| dt.application.FTPInputExample.operator.Reader.host | address of the ftp server | +| dt.application.FTPInputExample.operator.Reader.userName | user for the ftp server if anonymous ftp is disabled | +| dt.application.FTPInputExample.operator.Reader.password | password associated with the above user | +| dt.application.FTPInputExample.operator.Writer.filePath | output file path for the records after formatting | +| dt.application.FTPInputExample.operator.Writer.outputFileName | output file name for the records to be written after formatting | + +### How to compile +`shell> mvn clean package` + +This will generate application package malhar-examples-ftp-3.8.0-SNAPSHOT.apa inside target directory. + +### How to run +Use the application package generated above to launch the application from UI console(if available) or apex command line interface. + +`apex> launch target/malhar-examples-ftp-3.8.0-SNAPSHOT.apa` + + + + +In case you have issues configuring the operator or running the application, please send an email to [email protected]. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/pom.xml ---------------------------------------------------------------------- diff --git a/examples/ftp/pom.xml b/examples/ftp/pom.xml new file mode 100644 index 0000000..a54ec33 --- /dev/null +++ b/examples/ftp/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-ftp</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar FTP Example</name> + <description>Apex example applications that reads from an FTP server and writes to hdfs.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.8.0-SNAPSHOT</version> + </parent> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/ftp/src/assemble/appPackage.xml b/examples/ftp/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/ftp/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java ---------------------------------------------------------------------- diff --git a/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java new file mode 100644 index 0000000..5488970 --- /dev/null +++ b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.ftp; + +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.AbstractFTPInputOperator.FTPStringInputOperator; + +/** + * This application demonstrates the FTPStringInputOperator. It uses the FTPStringInputOperator which reads + * data from a directory on an FTP server, and then writes it to a file using the StringFileOutputOperator. + */ +@ApplicationAnnotation(name = "FTPInputExample") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // ftp read operator. Configuration through resources/META-INF/properties.xml + FTPStringInputOperator reader = dag.addOperator("Reader", new FTPStringInputOperator()); + //Set properties for the FTP input operator + reader.setHost("localhost"); + reader.setUserName("ftp"); + reader.setDirectory("sourceDir"); + reader.setPartitionCount(2); + + // writer that writes strings to a file on hdfs + StringFileOutputOperator writer = dag.addOperator("Writer", new StringFileOutputOperator()); + //Set properties for the output operator + writer.setFilePath("malhar_examples/ftp"); + writer.setFilePath("destination"); + + //Connect reader output to writer + dag.addStream("data", reader.output, writer.input); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java ---------------------------------------------------------------------- diff --git a/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java new file mode 100644 index 0000000..3a2339c --- /dev/null +++ b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.ftp; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/ftp/src/main/resources/META-INF/properties.xml b/examples/ftp/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..8b3ac89 --- /dev/null +++ b/examples/ftp/src/main/resources/META-INF/properties.xml @@ -0,0 +1,48 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- FTP Input Example --> + <property> + <name>dt.application.FTPInputExample.operator.Reader.partitionCount</name> + <value>2</value> + </property> + <property> + <name>dt.application.FTPInputExample.operator.Reader.host</name> + <value>localhost</value> + </property> + <property> + <name>dt.application.FTPInputExample.operator.Reader.userName</name> + <value>ftp</value> + </property> + <property> + <name>dt.application.FTPInputExample.operator.Reader.directory</name> + <value>sourceDir</value> + </property> + <property> + <name>dt.application.FTPInputExample.operator.Writer.filePath</name> + <value>malhar_examples/ftp</value> + </property> + <property> + <name>dt.application.FTPInputExample.operator.Writer.outputFileName</name> + <value>destination</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/ftp/src/test/resources/log4j.properties b/examples/ftp/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/ftp/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 16cfe26..cff85a7 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -198,6 +198,7 @@ <module>throttle</module> <module>transform</module> <module>kafka</module> + <module>ftp</module> </modules> <dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/mkdocs.yml ---------------------------------------------------------------------- diff --git a/mkdocs.yml b/mkdocs.yml index e23abe6..75a862a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -16,6 +16,7 @@ pages: - File Splitter: operators/file_splitter.md - Filter: operators/filter.md - Fixed Width Parser: operators/fixedWidthParserOperator.md + - FTP Input Operator: operators/ftpInputOperator.md - Jdbc Output Operator: operators/AbstractJdbcTransactionableOutputOperator.md - JDBC Poller Input: operators/jdbcPollInputOperator.md - JMS Input: operators/jmsInputOperator.md
