This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 5b9a8abc [ISSUE #366] Support for sftp connector(sinlk & source) (#382)
5b9a8abc is described below
commit 5b9a8abcf5106475099205fcac7edec8460485b8
Author: fireround <[email protected]>
AuthorDate: Wed Nov 30 14:48:33 2022 +0800
[ISSUE #366] Support for sftp connector(sinlk & source) (#382)
* Support for sftp connector(sinlk & source)
* update codestyle and copyright
* substitute jsch with sshj
* correct doc
* correct doc
---
connectors/rocketmq-connect-sftp/README.md | 112 ++++++++++++
...ketMQ Connect SFTP \345\210\206\344\272\253.md" | 50 ++++++
...ketMQ Connect SFTP \345\256\236\346\210\230.md" | 145 +++++++++++++++
...75\277\347\224\250\345\234\272\346\231\257.png" | Bin 0 -> 91912 bytes
connectors/rocketmq-connect-sftp/pom.xml | 197 +++++++++++++++++++++
.../rocketmq/connect/http/sink/SftpClient.java | 74 ++++++++
.../rocketmq/connect/http/sink/SftpConstant.java | 42 +++++
.../connect/http/sink/SftpSinkConnector.java | 62 +++++++
.../rocketmq/connect/http/sink/SftpSinkTask.java | 100 +++++++++++
.../connect/http/sink/SftpSourceConnector.java | 58 ++++++
.../rocketmq/connect/http/sink/SftpSourceTask.java | 155 ++++++++++++++++
11 files changed, 995 insertions(+)
diff --git a/connectors/rocketmq-connect-sftp/README.md
b/connectors/rocketmq-connect-sftp/README.md
new file mode 100644
index 00000000..0cb61dee
--- /dev/null
+++ b/connectors/rocketmq-connect-sftp/README.md
@@ -0,0 +1,112 @@
+# rocketmq-connect-sftp
+
+Plugin for Rocketmq Connect. Tansfer file based on SFTP.
+
+# How to use
+
+* start rocketmq nameserver
+
+```shell
+cd ${ROCKETMQ_HOME}
+nohup ./bin/mqnamesrv &
+```
+
+* start rocketmq broker
+
+```shell
+nohup ./bin/mqbroker -n localhost:9876 &
+```
+
+* build plugin
+
+```shell
+cd connectors/rocketmq-connect-sftp
+mvn clean install -Dmaven.test.skip=true
+```
+
+* create config file path/to/connect-standalone.conf same as
distribution/conf/connect-standalone.conf
+* modify
pluginPaths=path/to/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies
+* start org.apache.rocketmq.connect.runtime.StandaloneConnectStartup
+
+```shell
+cd rocketmq-connect-runtime
+mvn clean install -Dmaven.test.skip=true
+```
+
+* start source connector
+
+```http request
+POST /connectors/SftpSourceConnector HTTP/1.1
+Host: localhost:8082
+Content-Type: application/json
+
+{
+ "connector.class":
"org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
+ "host": "127.0.0.1",
+ "port": 22,
+ "username": "wencheng",
+ "password": "",
+ "filePath": "/Users/wencheng/Documents/source.txt",
+ "connect.topicname": "sftpTopic",
+
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+ "fieldSeparator": "\\|",
+ "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
+}
+```
+
+`warning:` make sure exist a file named "source.txt" on the sftp server.
+
+* start sink connector
+
+```http request
+POST /connectors/SftpSinkConnector HTTP/1.1
+Host: localhost:8082
+Content-Type: application/json
+
+{
+ "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
+ "host": "127.0.0.1",
+ "port": 22,
+ "username": "wencheng",
+ "password": "",
+ "filePath": "/Users/wencheng/Documents/sink.txt",
+ "connect.topicnames": "sftpTopic"
+}
+```
+
+## What we expected to see
+
+The file named sink.txt will be created, and the content of the "source.txt"
will appears in this file.
+
+## Appendix: Connector Configuration
+
+### sftp-source-connector configuration
+
+| KEY | TYPE | REQUIRED | DESCRIPTION
| EXAMPLE |
+| ----------------- | ------ | -------- |
------------------------------------------------------ | ------------------- |
+| host | String | Y | SFTP host
| localhost |
+| port | int | Y | SFTP port
| 22 |
+| username | String | Y | SFTP username
| wencheng |
+| password | String | Y | SFTP password
| |
+| filePath | String | Y | The name of the file which will be
transferred | /path/to/source.txt |
+| fieldSchema | String | Y | the data schema of each line
| |
+| fieldSeparator | String | Y | Symbol that separates each field
| |
+| connect.topicname | String | Y | The Message Queue topic which the
data will be send to | |
+
+### sftp-sink-connector configuration
+
+| KEY | TYPE | REQUIRED | DESCRIPTION
| EXAMPLE |
+| ------------------ | ------ | -------- |
---------------------------------------------------------- | ----------------- |
+| host | String | Y | SFTP host
| localhost |
+| port | int | Y | SFTP port
| 22 |
+| username | String | Y | SFTP username
| wencheng |
+| password | String | Y | SFTP password
| |
+| filePath | String | Y | The name of the file which will be
transferred | /path/to/sink.txt |
+| connect.topicnames | String | Y | The Message Queue topic which the
data will be pulled from | |
+| fieldSchema | String | Y | the data schema of each line
| |
+| fieldSeparator | String | Y | Symbol that separates each field
| |
+
+
+
+
+
diff --git "a/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP
\345\210\206\344\272\253.md" "b/connectors/rocketmq-connect-sftp/doc/RocketMQ
Connect SFTP \345\210\206\344\272\253.md"
new file mode 100644
index 00000000..a9f753e7
--- /dev/null
+++ "b/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP
\345\210\206\344\272\253.md"
@@ -0,0 +1,50 @@
+<!-- TOC -->
+
+- [connect **介绍**](#connect-介绍)
+- [SFTP 介绍](#sftp-介绍)
+- [connect-sftp 使用场景](#connect-sftp-使用场景)
+ - [使用 sftp-source-connector 同步对账文件](#使用-sftp-source-connector-同步对账文件)
+ - [使用 sftp-sink-connector 生成对账文件](#使用-sftp-sink-connector-生成对账文件)
+ - [使用 sftp-sink-connector 和 sftp-source-connector 同步 sftp
文件](#使用-sftp-sink-connector-和-sftp-source-connector-同步-sftp-文件)
+- [connect-sftp 演示](#connect-sftp-演示)
+- [参考](#参考)
+
+<!-- TOC -->
+
+## connect **介绍**
+
+* 基于 openMessage connect 标准之上。实现了对 Connector 的管理(Rest API、Shell Command)
+* 实现了对 task 的任务调度
+* 通过插件机制和继承的扩展方式,使得扩展 connect 支持各种不同的应用协议变得比较轻量和容易。
+
+## SFTP 介绍
+
+全称 SSH File Transfer Protocol,运行在 SSH 协议之上(与 FTP 没有任何共同点),用于文件的传输。
+
+## connect-sftp 使用场景
+
+### 使用 sftp-source-connector 同步对账文件
+
+使用 sftp-source-connector 同步对账文件到 MQ,业务系统对接MQ,消费相应主题的消息进行业务处理。
+
+
+优势:
+
+* 借助MQ,业务系统作为消费者可以轻松实现负载均衡
+* 业务系统不必关心数据的读取和转换过程。该过程的可靠性由 connect 保证。
+
+劣势:
+
+* 需要独立部署一个 connect 服务,并做相应的维护和配置。
+
+### 使用 sftp-sink-connector 生成对账文件
+
+### 使用 sftp-sink-connector 和 sftp-source-connector 同步 sftp 文件
+
+## connect-sftp 演示
+
+[RocketMQ Connect SFTP 实战.md](RocketMQ Connect SFTP 实战.md)
+
+## 参考
+
+[sftp.net](https://www.sftp.net/)
\ No newline at end of file
diff --git "a/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP
\345\256\236\346\210\230.md" "b/connectors/rocketmq-connect-sftp/doc/RocketMQ
Connect SFTP \345\256\236\346\210\230.md"
new file mode 100644
index 00000000..e00e08ee
--- /dev/null
+++ "b/connectors/rocketmq-connect-sftp/doc/RocketMQ Connect SFTP
\345\256\236\346\210\230.md"
@@ -0,0 +1,145 @@
+# RocketMQ Connect SFTP 实战
+
+## 准备
+
+### 启动RocketMQ
+
+1. Linux/Unix/Mac
+2. 64bit JDK 1.8+;
+3. Maven 3.2.x或以上版本;
+4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/);
+
+
+
+**提示** : ${ROCKETMQ_HOME} 位置说明
+
+>bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release
+>
+>source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution
+
+
+### 启动Connect
+
+
+#### Connector插件编译
+
+RocketMQ Connector SFTP
+```
+$ cd rocketmq-connect/connectors/rocketmq-connect-sftp/
+$ mvn clean package -Dmaven.test.skip=true
+```
+
+将 RocketMQ Connector SFTP 编译好的包放入Runtime加载目录。命令如下:
+```
+mkdir -p /usr/local/connector-plugins
+cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+```
+
+#### 启动Connect Runtime
+
+```
+cd rocketmq-connect
+
+mvn -Prelease-connect -DskipTests clean install -U
+
+```
+
+修改配置`connect-standalone.conf` ,重点配置如下
+```
+$ cd
distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
+$ vim conf/connect-standalone.conf
+```
+
+```
+workerId=standalone-worker
+storePathRootDir=/tmp/storeRoot
+
+## Http port for user to access REST API
+httpPort=8082
+
+# Rocketmq namesrvAddr
+namesrvAddr=localhost:9876
+
+# RocketMQ acl
+aclEnable=false
+accessKey=rocketmq
+secretKey=12345678
+
+autoCreateGroupEnable=false
+clusterName="DefaultCluster"
+
+# 核心配置,将之前编译好包的插件目录配置在此;
+# Source or sink connector jar file dir,The default value is
rocketmq-connect-sample
+pluginPaths=/usr/local/connector-plugins
+```
+
+
+```
+cd
distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
+
+sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
+
+```
+
+### SFTP 服务器搭建
+
+使用 MAC OS 自带的 SFTP 服务器
+
+[允许远程电脑访问你的 Mac](https://support.apple.com/zh-cn/guide/mac-help/mchlp1066/mac)
+
+### 测试数据
+
+登陆 SFTP 服务器,将具有如何内容的 souce.txt 文件放入用户目录,例如:/path/to/
+
+```text
+张三|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
+李四|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
+赵五|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00
+```
+
+## 启动Connector
+
+### 启动 SFTP source connector
+
+同步 SFTP 文件:source.txt
+作用:通过登陆 SFTP 服务器,解析文件并封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
+
+```shell
+curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector"
--http1.1 \
+ -H "Host: localhost:8082" \
+ -H "Content-Type: application/json" \
+ -d "{
+ \"connector.class\":
\"org.apache.rocketmq.connect.http.sink.SftpSourceConnector\",
+ \"host\": \"127.0.0.1\",
+ \"port\": 22,
+ \"username\": \"wencheng\",
+ \"password\": \"1617\",
+ \"filePath\": \"/Users/wencheng/Documents/source.txt\",
+ \"connect.topicname\": \"sftpTopic\",
+ \"fieldSeparator\": \"|\",
+ \"fieldSchema\":
\"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\"
+ }"
+```
+
+### 启动 SFTP sink connector
+
+作用:通过消费Topic中的数据,通过SFTP协议写入到目标文件当中
+
+```shell
+curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector"
--http1.1 \
+ -H "Host: localhost:8082" \
+ -H "Content-Type: application/json" \
+ -d "{
+ \"connector.class\":
\"org.apache.rocketmq.connect.http.sink.SftpSinkConnector\",
+ \"host\": \"127.0.0.1\",
+ \"port\": 22,
+ \"username\": \"wencheng\",
+ \"password\": \"1617\",
+ \"filePath\": \"/Users/wencheng/Documents/sink.txt\",
+ \"connect.topicnames\": \"sftpTopic\",
+ \"fieldSeparator\": \"|\",
+ \"fieldSchema\":
\"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\"
+ }"
+```
+
+****
\ No newline at end of file
diff --git
"a/connectors/rocketmq-connect-sftp/doc/\344\275\277\347\224\250\345\234\272\346\231\257.png"
"b/connectors/rocketmq-connect-sftp/doc/\344\275\277\347\224\250\345\234\272\346\231\257.png"
new file mode 100644
index 00000000..0c27d747
Binary files /dev/null and
"b/connectors/rocketmq-connect-sftp/doc/\344\275\277\347\224\250\345\234\272\346\231\257.png"
differ
diff --git a/connectors/rocketmq-connect-sftp/pom.xml
b/connectors/rocketmq-connect-sftp/pom.xml
new file mode 100644
index 00000000..06d0094d
--- /dev/null
+++ b/connectors/rocketmq-connect-sftp/pom.xml
@@ -0,0 +1,197 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-sftp</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <name>rocketmq-connect-sftp</name>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <slf4j.version>1.7.7</slf4j.version>
+ <logback.version>1.2.9</logback.version>
+
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+ <commons-lang3.version>3.12.0</commons-lang3.version>
+ <fastjson.version>1.2.83</fastjson.version>
+ <sshj.version>0.32.0</sshj.version>
+ <junit.version>4.13.1</junit.version>
+ <assertj.version>2.6.0</assertj.version>
+ <mockito.version>2.6.3</mockito.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>${openmessaging-connector.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>${fastjson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.hierynomus</groupId>
+ <artifactId>sshj</artifactId>
+ <version>${sshj.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpClient.java
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpClient.java
new file mode 100644
index 00000000..03785683
--- /dev/null
+++
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpClient.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.sftp.OpenMode;
+import net.schmizz.sshj.sftp.RemoteFile;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
+import net.schmizz.sshj.userauth.UserAuthException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpClient implements Closeable {
+
+ private static final Logger log =
LoggerFactory.getLogger(SftpClient.class);
+
+ private SSHClient sshClient;
+
+ private SFTPClient internalSFTPClient;
+
+ public SftpClient(String host, int port, String username, String password)
{
+ sshClient = new SSHClient();
+ sshClient.addHostKeyVerifier(new PromiscuousVerifier());
+ try {
+ sshClient.connect(host, port);
+ sshClient.authPassword(username, password);
+ internalSFTPClient = sshClient.newSFTPClient();
+ } catch (UserAuthException e) {
+ log.error(e.getMessage(), e);
+ } catch (TransportException e) {
+ log.error(e.getMessage(), e);
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ public RemoteFile open(String filename) throws IOException {
+ return internalSFTPClient.getSFTPEngine().open(filename);
+ }
+
+ public RemoteFile open(String filename, Set<OpenMode> modes) throws
IOException {
+ return internalSFTPClient.getSFTPEngine().open(filename, modes);
+ }
+
+ @Override
+ public void close() {
+ try {
+ internalSFTPClient.close();
+ sshClient.close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpConstant.java
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpConstant.java
new file mode 100644
index 00000000..eb47cf50
--- /dev/null
+++
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpConstant.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+public class SftpConstant {
+
+ public static final String LOGGER_NAME = "SftpConnector";
+
+ public static final String SFTP_HOST_KEY = "host";
+
+ public static final String SFTP_PORT_KEY = "port";
+
+ public static final String SFTP_USERNAME_KEY = "username";
+
+ public static final String SFTP_PASSWORD_KEY = "password";
+
+ public static final String SFTP_PATH_KEY = "filePath";
+
+ public static final String SFTP_FIELD_SEPARATOR = "fieldSeparator";
+
+ public static final String SFTP_FIELD_SCHEMA = "fieldSchema";
+
+ public static final String RECORD_PARTITION_STORAGE_KEY = "partition";
+
+ public static final String RECORD_OFFSET_STORAGE_KEY = "offset";
+
+}
diff --git
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkConnector.java
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkConnector.java
new file mode 100644
index 00000000..24f6e5a8
--- /dev/null
+++
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkConnector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpSinkConnector extends SinkConnector {
+
+ private Logger log = LoggerFactory.getLogger(SftpConstant.LOGGER_NAME);
+
+ private KeyValue config;
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> taskConfigs = new ArrayList<>();
+ if (config != null) {
+ taskConfigs.add(config);
+ }
+ return taskConfigs;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return SftpSinkTask.class;
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ log.info("Sftp connector started");
+ this.config = config;
+ }
+
+ @Override
+ public void stop() {
+ log.info("Sftp connector stopped");
+ }
+}
diff --git
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkTask.java
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkTask.java
new file mode 100644
index 00000000..78bbab31
--- /dev/null
+++
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSinkTask.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.regex.Pattern;
+import net.schmizz.sshj.sftp.OpenMode;
+import net.schmizz.sshj.sftp.RemoteFile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SCHEMA;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SEPARATOR;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_HOST_KEY;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PASSWORD_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PATH_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PORT_KEY;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_USERNAME_KEY;
+
+public class SftpSinkTask extends SinkTask {
+ private static final Logger log =
LoggerFactory.getLogger(SftpSinkTask.class);
+
+ private SftpClient sftpClient;
+
+ private String filePath;
+
+ private String fieldSeparator;
+
+ private String[] fieldSchema;
+
+ @Override public void validate(KeyValue config) {
+ if (StringUtils.isBlank(config.getString(SFTP_HOST_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_PORT_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_USERNAME_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_PASSWORD_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_PATH_KEY))) {
+ throw new RuntimeException("missing required config");
+ }
+ }
+
+ @Override public void start(KeyValue config) {
+ String host = config.getString(SFTP_HOST_KEY);
+ int port = config.getInt(SFTP_PORT_KEY);
+ String username = config.getString(SFTP_USERNAME_KEY);
+ String password = config.getString(SFTP_PASSWORD_KEY);
+ this.filePath = config.getString(SFTP_PATH_KEY);
+ this.sftpClient = new SftpClient(host, port, username, password);
+ fieldSeparator = config.getString(SFTP_FIELD_SEPARATOR);
+ String fieldSchemaStr = config.getString(SFTP_FIELD_SCHEMA);
+ fieldSchema = fieldSchemaStr.split(Pattern.quote(fieldSeparator));
+ }
+
+ @Override public void put(List<ConnectRecord> sinkRecords) throws
ConnectException {
+ try (RemoteFile remoteFile = sftpClient.open(filePath,
EnumSet.of(OpenMode.READ, OpenMode.CREAT, OpenMode.WRITE, OpenMode.APPEND));
+ OutputStream outputStream = remoteFile.new
RemoteFileOutputStream()) {
+ for (ConnectRecord connectRecord : sinkRecords) {
+ String str = (String) connectRecord.getData();
+ JSONObject jsonObject = JSON.parseObject(str);
+ StringBuilder lineBuilder = new StringBuilder();
+ for (int i = 0; i < fieldSchema.length; i++) {
+
lineBuilder.append(jsonObject.getString(fieldSchema[i])).append(fieldSeparator);
+ }
+ lineBuilder.append(System.lineSeparator());
+ byte[] line = lineBuilder.toString().getBytes();
+ outputStream.write(line, 0, line.length);
+ }
+ } catch (IOException e) {
+ log.error("sink task ioexception", e);
+ }
+ }
+
+ @Override public void stop() {
+ sftpClient.close();
+ }
+}
diff --git
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceConnector.java
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceConnector.java
new file mode 100644
index 00000000..82f1ed77
--- /dev/null
+++
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceConnector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpSourceConnector extends SourceConnector {
+
+ private Logger log = LoggerFactory.getLogger(SftpConstant.LOGGER_NAME);
+
+ private KeyValue config;
+
+ @Override
+ public List<KeyValue> taskConfigs(int i) {
+ List<KeyValue> taskConfigs = new ArrayList<>();
+ if (config != null) {
+ taskConfigs.add(config);
+ }
+ return taskConfigs;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return SftpSourceTask.class;
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ log.info("Sftp connector started");
+ this.config = config;
+ }
+
+ @Override
+ public void stop() {
+ log.info("Sftp connector stopped");
+ }
+}
diff --git
a/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceTask.java
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceTask.java
new file mode 100644
index 00000000..466e5a0b
--- /dev/null
+++
b/connectors/rocketmq-connect-sftp/src/main/java/org/apache/rocketmq/connect/http/sink/SftpSourceTask.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.http.sink;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.FileSystemException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import net.schmizz.sshj.sftp.RemoteFile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.RECORD_OFFSET_STORAGE_KEY;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.RECORD_PARTITION_STORAGE_KEY;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SCHEMA;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_FIELD_SEPARATOR;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_HOST_KEY;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PASSWORD_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PATH_KEY;
+import static org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_PORT_KEY;
+import static
org.apache.rocketmq.connect.http.sink.SftpConstant.SFTP_USERNAME_KEY;
+
+public class SftpSourceTask extends SourceTask {
+
+ private final Logger log =
LoggerFactory.getLogger(SftpConstant.LOGGER_NAME);
+
+ private SftpClient sftpClient;
+
+ private String filePath;
+
+ private String fieldSeparator;
+
+ private String[] fieldSchema;
+
+ private static final int MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME = 2000;
+
+ @Override public void init(SourceTaskContext sourceTaskContext) {
+ super.init(sourceTaskContext);
+ }
+
+ @Override public void validate(KeyValue config) {
+ if (StringUtils.isBlank(config.getString(SFTP_HOST_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_PORT_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_USERNAME_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_PASSWORD_KEY))
+ || StringUtils.isBlank(config.getString(SFTP_PATH_KEY))) {
+ throw new RuntimeException("missing required config");
+ }
+ }
+
+ @Override public void start(KeyValue config) {
+ String host = config.getString(SFTP_HOST_KEY);
+ int port = config.getInt(SFTP_PORT_KEY);
+ String username = config.getString(SFTP_USERNAME_KEY);
+ String password = config.getString(SFTP_PASSWORD_KEY);
+ this.filePath = config.getString(SFTP_PATH_KEY);
+ this.sftpClient = new SftpClient(host, port, username, password);
+ fieldSeparator = config.getString(SFTP_FIELD_SEPARATOR);
+ String fieldSchemaStr = config.getString(SFTP_FIELD_SCHEMA);
+ fieldSchema = fieldSchemaStr.split(Pattern.quote(fieldSeparator));
+ }
+
+ @Override public void stop() {
+ sftpClient.close();
+ }
+
+ @Override public List<ConnectRecord> poll() {
+ int offset = readRecordOffset();
+ try (RemoteFile remoteFile = sftpClient.open(filePath);
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(remoteFile.new RemoteFileInputStream(offset)))) {
+ List<ConnectRecord> records = new ArrayList<>();
+ String line;
+ ConnectRecord connectRecord;
+
+ while ((line = reader.readLine()) != null) {
+ offset = offset + line.getBytes().length + 1;
+
+ // do not send empty string to mq
+ if (!StringUtils.isEmpty(line)) {
+ String[] data = line.split(Pattern.quote(fieldSeparator));
+ JSONObject jsonObject = new JSONObject();
+ for (int i = 0; i < fieldSchema.length; i++) {
+ jsonObject.put(fieldSchema[i], data[i]);
+ }
+ connectRecord = new
ConnectRecord(buildRecordPartition(filePath), buildRecordOffset(offset),
System.currentTimeMillis());
+
+ connectRecord.setData(jsonObject.toString());
+ records.add(connectRecord);
+ if (records.size() >
MAX_NUMBER_SEND_CONNECT_RECORD_EACH_TIME) {
+ break;
+ }
+ }
+ }
+ return records;
+ } catch (FileSystemException e) {
+ log.error("File system error", e);
+ } catch (IOException e) {
+ log.error("SFTP IOException", e);
+ }
+ return null;
+ }
+
+ private RecordOffset buildRecordOffset(int offset) {
+ Map<String, Integer> offsetMap = new HashMap<>();
+ offsetMap.put(RECORD_OFFSET_STORAGE_KEY, offset);
+ return new RecordOffset(offsetMap);
+ }
+
+ private RecordPartition buildRecordPartition(String partitionValue) {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put(RECORD_PARTITION_STORAGE_KEY, partitionValue);
+ return new RecordPartition(partitionMap);
+ }
+
+ private int readRecordOffset() {
+ RecordOffset positionInfo =
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(filePath));
+ if (positionInfo == null) {
+ return 0;
+ }
+ Object offset =
positionInfo.getOffset().get(RECORD_OFFSET_STORAGE_KEY);
+ if (offset == null) {
+ return 0;
+ } else {
+ return (int) offset;
+ }
+ }
+}