This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 94864c0  [Feature][Connector] Add clickhouse-file sink support 
clickhouse bullk load (#1501)
94864c0 is described below

commit 94864c0772678fc79d1c50c2b22a4459d0e8cc1f
Author: TrickyZerg <[email protected]>
AuthorDate: Thu Mar 24 17:47:24 2022 +0800

    [Feature][Connector] Add clickhouse-file sink support clickhouse bullk load 
(#1501)
    
    * [Feature#1382][Connector]  Add clickhouse-file sink support clickhouse 
bulk load
    update first piece
---
 pom.xml                                            |  14 +-
 .../seatunnel-connector-spark-clickhouse/pom.xml   |   5 +
 .../org.apache.seatunnel.spark.BaseSparkSink       |   1 +
 .../apache/seatunnel/spark/sink/Clickhouse.scala   | 311 ++++++++++-----------
 .../seatunnel/spark/sink/ClickhouseFile.scala      | 283 +++++++++++++++++++
 .../seatunnel/spark/sink/clickhouse/Table.scala    | 106 +++++++
 .../clickhouse/filetransfer/FileTransfer.scala     |  30 ++
 .../filetransfer/RsyncFileTransfer.scala           |  37 +++
 .../clickhouse/filetransfer/ScpFileTransfer.scala  |  85 ++++++
 .../clickhouse/filetransfer/TransferMethod.scala   |  35 +++
 seatunnel-dist/release-docs/LICENSE                |   4 +
 seatunnel-dist/release-docs/NOTICE                 |  14 +
 tools/dependencies/known-dependencies.txt          |   5 +
 13 files changed, 768 insertions(+), 162 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1ade5e3..7fdb317 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
         <junit.version>4.13.2</junit.version>
         <tispark.version>2.4.1</tispark.version>
         <druid.version>0.22.1</druid.version>
+        <sshd.version>2.7.0</sshd.version>
         <calcite-druid.version>1.29.0</calcite-druid.version>
         <config.version>1.3.3</config.version>
         <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
@@ -468,6 +469,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.sshd</groupId>
+                <artifactId>sshd-scp</artifactId>
+                <version>${sshd.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>
                 <version>${junit.version}</version>
@@ -605,7 +612,8 @@
                     <configuration>
                         <skip>${skipUT}</skip>
                         <systemPropertyVariables>
-                            
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
+                            
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec
+                            </jacoco-agent.destfile>
                         </systemPropertyVariables>
                         <excludes>
                             <exclude>**/*IT.java</exclude>
@@ -732,7 +740,9 @@
                         
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
                         
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
                         <!--suppress UnresolvedMavenProperty -->
-                        
<configLocation>${maven.multiModuleProjectDirectory}/tools/checkstyle/scalastyle-config.xml</configLocation>
+                        <configLocation>
+                            
${maven.multiModuleProjectDirectory}/tools/checkstyle/scalastyle-config.xml
+                        </configLocation>
                         
<outputFile>${project.build.directory}/target/scalastyle-output.xml</outputFile>
                         <inputEncoding>UTF-8</inputEncoding>
                         <outputEncoding>UTF-8</outputEncoding>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
index cf2281d..e5e7c31 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
@@ -50,6 +50,11 @@
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-scp</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
index 7245dc7..8512dea 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -16,3 +16,4 @@
 #
 
 org.apache.seatunnel.spark.sink.Clickhouse
+org.apache.seatunnel.spark.sink.ClickhouseFile
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index 58da5b6..5e10cc8 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -18,15 +18,15 @@ package org.apache.seatunnel.spark.sink
 
 import net.jpountz.xxhash.{XXHash64, XXHashFactory}
 import org.apache.commons.lang3.StringUtils
+
 import java.math.{BigDecimal, BigInteger}
 import java.sql.{Date, PreparedStatement, Timestamp}
-
 import java.text.SimpleDateFormat
 import java.util
 import java.util.{Objects, Properties}
 import scala.collection.JavaConversions._
 import scala.collection.immutable.HashMap
-import scala.util.{Failure, Random, Success, Try}
+import scala.util.{Failure, Success, Try}
 import scala.util.matching.Regex
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
@@ -34,16 +34,16 @@ import 
org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig,
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
-import org.apache.seatunnel.spark.sink.Clickhouse.{DistributedEngine, Shard}
+import org.apache.seatunnel.spark.sink.Clickhouse.{Shard, 
acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, 
getClickHouseSchema, getClickhouseConnection, getClusterShardList, 
getDefaultValue, getRowShard}
 import org.apache.spark.sql.{Dataset, Row}
-import ru.yandex.clickhouse.{BalancedClickhouseDataSource, 
ClickHouseConnectionImpl}
+import ru.yandex.clickhouse.{BalancedClickhouseDataSource, 
ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
 import ru.yandex.clickhouse.except.{ClickHouseException, 
ClickHouseUnknownException}
 
 import java.nio.ByteBuffer
+import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.AtomicLong
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
-
 import scala.annotation.tailrec
 
 class Clickhouse extends SparkBatchSink {
@@ -59,7 +59,7 @@ class Clickhouse extends SparkBatchSink {
 
   // used for split mode
   private var splitMode: Boolean = false
-  private val random = new Random()
+  private val random = ThreadLocalRandom.current()
   private var shardKey: String = ""
   private var shardKeyType: String = _
   private var shardTable: String = _
@@ -81,7 +81,7 @@ class Clickhouse extends SparkBatchSink {
       val statementMap = this.shards.map(s => {
         val executorBalanced = new BalancedClickhouseDataSource(s._2.jdbc, 
this.properties)
         val executorConn = 
executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
-        (s._2, executorConn.prepareStatement(this.initSQL))
+        (s._2, 
executorConn.prepareStatement(this.initSQL).asInstanceOf[ClickHousePreparedStatementImpl])
       }).toMap
 
       // hashInstance cannot be serialized, can only be created in a partition
@@ -89,7 +89,8 @@ class Clickhouse extends SparkBatchSink {
 
       val lengthMap = statementMap.map(s => (s._1, new AtomicLong(0)))
       for (item <- iter) {
-        val shard = getRowShard(hashInstance, item)
+        val shard = getRowShard(this.splitMode, this.shards, this.shardKey, 
this.shardKeyType, this
+          .shardWeightCount, this.random, hashInstance, item)
         val statement = statementMap(shard)
         renderStatement(fields, item, dfFields, statement)
         statement.addBatch()
@@ -124,18 +125,15 @@ class Clickhouse extends SparkBatchSink {
       }
       val database = config.getString("database")
       val host = config.getString("host")
-      val globalJdbc = String.format("jdbc:clickhouse://%s/%s", host, database)
-      val balanced: BalancedClickhouseDataSource =
-        new BalancedClickhouseDataSource(globalJdbc, properties)
-      val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+      val conn = getClickhouseConnection(host, database, properties)
       val hostAndPort = host.split(":")
       this.table = config.getString("table")
-      this.tableSchema = getClickHouseSchema(conn, this.table)
+      this.tableSchema = getClickHouseSchema(conn, this.table).toMap
       if (splitMode) {
         val tableName = config.getString("table")
         val localTable = getClickHouseDistributedTable(conn, database, 
tableName)
         if (Objects.isNull(localTable)) {
-          CheckResult.error(s"split mode only support table which engine is 
'Distributed' at now")
+          CheckResult.error(s"split mode only support table which engine is 
'$distributedEngine' at now")
         } else {
           this.shardTable = localTable.table
           val shardList = getClusterShardList(conn, localTable.clusterName, 
localTable.database, hostAndPort(1))
@@ -151,8 +149,7 @@ class Clickhouse extends SparkBatchSink {
         }
       } else {
         // only one connection
-        this.shards(0) = new Shard(1, 1, 1, hostAndPort(0),
-          hostAndPort(0), hostAndPort(1), database)
+        this.shards(0) = Shard(1, 1, 1, hostAndPort(0), hostAndPort(0), 
hostAndPort(1), database)
       }
 
       if (StringUtils.isNotEmpty(this.shardKey)) {
@@ -165,7 +162,7 @@ class Clickhouse extends SparkBatchSink {
       }
       if (this.config.hasPath("fields")) {
         this.fields = config.getStringList("fields")
-        checkResult = acceptedClickHouseSchema()
+        checkResult = acceptedClickHouseSchema(this.fields.toList, 
this.tableSchema, this.table)
       }
     }
     checkResult
@@ -186,78 +183,6 @@ class Clickhouse extends SparkBatchSink {
     retryCodes = config.getIntList("retry_codes")
   }
 
-  private def getClickHouseSchema(
-                                   conn: ClickHouseConnectionImpl,
-                                   table: String): Map[String, String] = {
-    val sql = String.format("desc %s", table)
-    val resultSet = conn.createStatement.executeQuery(sql)
-    var schema = new HashMap[String, String]()
-    while (resultSet.next()) {
-      schema += (resultSet.getString(1) -> resultSet.getString(2))
-    }
-    schema
-  }
-
-  private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: 
String,
-                                  database: String, port: String): 
ListBuffer[Shard] = {
-    val rs = conn.createStatement().executeQuery(
-      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
-        s"port from system.clusters where cluster = '$clusterName'")
-    // TODO The port will be used for data communication of the tcp protocol 
in the future
-    // port is tcp protocol, need http protocol port at now
-    val nodeList = mutable.ListBuffer[Shard]()
-    while (rs.next()) {
-      nodeList += new Shard(rs.getInt(1), rs.getInt(2),
-        rs.getInt(3), rs.getString(4), rs.getString(5),
-        port, database)
-    }
-    nodeList
-  }
-
-  private def getClickHouseDistributedTable(conn: ClickHouseConnectionImpl, 
database: String, table: String):
-  DistributedEngine = {
-    val rs = conn.createStatement().executeQuery(
-      s"select engine_full from system.tables where " +
-        s"database = '$database' and name = '$table' and engine = 
'Distributed'")
-    if (rs.next()) {
-      // engineFull field will be like : Distributed(cluster, database, 
table[, sharding_key[, policy_name]])
-      val engineFull = rs.getString(1)
-      val infos = engineFull.substring(12).split(",").map(s => 
s.replaceAll("'", ""))
-      new DistributedEngine(infos(0), infos(1).trim, 
infos(2).replaceAll("\\)", "").trim)
-    } else {
-      null
-    }
-  }
-
-  private def getRowShard(hashInstance: XXHash64, row: Row): Shard = {
-    if (splitMode) {
-      if (StringUtils.isEmpty(this.shardKey) || 
row.schema.fieldNames.indexOf(this.shardKey) == -1) {
-        this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) + 
1).getValue
-      } else {
-        val fieldIndex = row.fieldIndex(this.shardKey)
-        if (row.isNullAt(fieldIndex)) {
-          this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) + 
1).getValue
-        } else {
-          var offset = 0
-          this.shardKeyType match {
-            case Clickhouse.floatPattern(_) =>
-              offset = row.getFloat(fieldIndex).toInt % this.shardWeightCount
-            case Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
-              offset = row.getInt(fieldIndex) % this.shardWeightCount
-            case Clickhouse.decimalPattern(_) =>
-              offset = 
row.getDecimal(fieldIndex).toBigInteger.mod(BigInteger.valueOf(this
-                .shardWeightCount)).intValue()
-            case _ =>
-              offset = 
(hashInstance.hash(ByteBuffer.wrap(row.getString(fieldIndex).getBytes), 0) & 
Long.MaxValue %
-                this.shardWeightCount).toInt
-          }
-          this.shards.lowerEntry(offset + 1).getValue
-        }
-      }
-    } else {
-      this.shards.head._2
-    }
-  }
 
   private def initPrepareSQL(): String = {
 
@@ -275,73 +200,11 @@ class Clickhouse extends SparkBatchSink {
     sql
   }
 
-  private def acceptedClickHouseSchema(): CheckResult = {
-    val nonExistsFields = fields
-      .map(field => (field, tableSchema.contains(field)))
-      .filter { case (_, exist) => !exist }
-
-    if (nonExistsFields.nonEmpty) {
-      CheckResult.error(
-        "field " + nonExistsFields
-          .map(option => "[" + option + "]")
-          .mkString(", ") + " not exist in table " + this.table)
-    } else {
-      val nonSupportedType = fields
-        .map(field => (tableSchema(field), 
Clickhouse.supportOrNot(tableSchema(field))))
-        .filter { case (_, exist) => !exist }
-      if (nonSupportedType.nonEmpty) {
-        CheckResult.error(
-          "clickHouse data type " + nonSupportedType
-            .map(option => "[" + option + "]")
-            .mkString(", ") + " not support in current version.")
-      } else {
-        CheckResult.success()
-      }
-    }
-  }
-
-  @tailrec
   private def renderDefaultStatement(
                                       index: Int,
                                       fieldType: String,
-                                      statement: PreparedStatement): Unit = {
-    fieldType match {
-      case "DateTime" | "Date" | "String" =>
-        statement.setString(index + 1, 
Clickhouse.renderStringDefault(fieldType))
-      case Clickhouse.datetime64Pattern(_) =>
-        statement.setString(index + 1, 
Clickhouse.renderStringDefault(fieldType))
-      case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
-        statement.setInt(index + 1, 0)
-      case "UInt64" | "Int64" =>
-        statement.setLong(index + 1, 0)
-      case "Float32" => statement.setFloat(index + 1, 0)
-      case "Float64" => statement.setDouble(index + 1, 0)
-      case Clickhouse.lowCardinalityPattern(lowCardinalityType) =>
-        renderDefaultStatement(index, lowCardinalityType, statement)
-      case Clickhouse.arrayPattern(_) => statement.setNull(index, 
java.sql.Types.ARRAY)
-      case Clickhouse.nullablePattern(nullFieldType) =>
-        renderNullStatement(index, nullFieldType, statement)
-      case _ => statement.setString(index + 1, "")
-    }
-  }
-
-  private def renderNullStatement(
-                                   index: Int,
-                                   fieldType: String,
-                                   statement: PreparedStatement): Unit = {
-    fieldType match {
-      case "String" =>
-        statement.setNull(index + 1, java.sql.Types.VARCHAR)
-      case "DateTime" => statement.setNull(index + 1, java.sql.Types.DATE)
-      case Clickhouse.datetime64Pattern(_) => statement.setNull(index + 1, 
java.sql.Types.TIMESTAMP)
-      case "Date" => statement.setNull(index + 1, java.sql.Types.TIME)
-      case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
-        statement.setNull(index + 1, java.sql.Types.INTEGER)
-      case "UInt64" | "Int64" =>
-        statement.setNull(index + 1, java.sql.Types.BIGINT)
-      case "Float32" => statement.setNull(index + 1, java.sql.Types.FLOAT)
-      case "Float64" => statement.setNull(index + 1, java.sql.Types.DOUBLE)
-    }
+                                      statement: 
ClickHousePreparedStatementImpl): Unit = {
+    statement.setObject(index + 1, getDefaultValue(fieldType))
   }
 
   private def renderBaseTypeStatement(
@@ -349,7 +212,7 @@ class Clickhouse extends SparkBatchSink {
                                        fieldIndex: Int,
                                        fieldType: String,
                                        item: Row,
-                                       statement: PreparedStatement): Unit = {
+                                       statement: 
ClickHousePreparedStatementImpl): Unit = {
     fieldType match {
       case "String" =>
         statement.setString(index + 1, item.getAs[String](fieldIndex))
@@ -400,7 +263,7 @@ class Clickhouse extends SparkBatchSink {
                                fields: util.List[String],
                                item: Row,
                                dsFields: Array[String],
-                               statement: PreparedStatement): Unit = {
+                               statement: ClickHousePreparedStatementImpl): 
Unit = {
     for (i <- 0 until fields.size()) {
       val field = fields.get(i)
       val fieldType = tableSchema(field)
@@ -467,6 +330,7 @@ object Clickhouse {
   val floatPattern: Regex = "(Float.*)".r
   val decimalPattern: Regex = "(Decimal.*)".r
   val datetime64Pattern: Regex = "(DateTime64\\(.*\\))".r
+  val distributedEngine = "Distributed"
 
   /**
    * Seatunnel support this clickhouse data type or not.
@@ -491,6 +355,134 @@ object Clickhouse {
     }
   }
 
+  def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+                          database: String, port: String): ListBuffer[Shard] = 
{
+    val rs = conn.createStatement().executeQuery(
+      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+        s"port from system.clusters where cluster = '$clusterName'")
+    // TODO The port will be used for data communication of the tcp protocol 
in the future
+    // port is tcp protocol, need http protocol port at now
+    val nodeList = mutable.ListBuffer[Shard]()
+    while (rs.next()) {
+      nodeList += Shard(rs.getInt(1), rs.getInt(2),
+        rs.getInt(3), rs.getString(4), rs.getString(5),
+        port, database)
+    }
+    nodeList
+  }
+
+  def getClickHouseDistributedTable(conn: ClickHouseConnectionImpl, database: 
String, table: String):
+  DistributedEngine = {
+    val rs = conn.createStatement().executeQuery(
+      s"select engine_full from system.tables where " +
+        s"database = '$database' and name = '$table' and engine = 
'$distributedEngine'")
+    if (rs.next()) {
+      // engineFull field will be like : Distributed(cluster, database, 
table[, sharding_key[, policy_name]])
+      val engineFull = rs.getString(1)
+      val infos = engineFull.substring(12).split(",").map(s => 
s.replaceAll("'", ""))
+      DistributedEngine(infos(0), infos(1).trim, infos(2).replaceAll("\\)", 
"").trim)
+    } else {
+      null
+    }
+  }
+
+  def getRowShard(splitMode: Boolean, shards: util.TreeMap[Int, Shard], 
shardKey: String, shardKeyType: String,
+                  shardWeightCount: Int, random: ThreadLocalRandom, 
hashInstance: XXHash64,
+                  row: Row): Shard = {
+    if (splitMode) {
+      if (StringUtils.isEmpty(shardKey) || 
row.schema.fieldNames.indexOf(shardKey) == -1) {
+        shards.lowerEntry(random.nextInt(shardWeightCount) + 1).getValue
+      } else {
+        val fieldIndex = row.fieldIndex(shardKey)
+        if (row.isNullAt(fieldIndex)) {
+          shards.lowerEntry(random.nextInt(shardWeightCount) + 1).getValue
+        } else {
+          var offset = 0
+          shardKeyType match {
+            case Clickhouse.floatPattern(_) =>
+              offset = row.getFloat(fieldIndex).toInt % shardWeightCount
+            case Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
+              offset = row.getInt(fieldIndex) % shardWeightCount
+            case Clickhouse.decimalPattern(_) =>
+              offset = 
row.getDecimal(fieldIndex).toBigInteger.mod(BigInteger.valueOf(shardWeightCount)).intValue()
+            case _ =>
+              offset = 
(hashInstance.hash(ByteBuffer.wrap(row.getString(fieldIndex).getBytes), 0) & 
Long.MaxValue %
+                shardWeightCount).toInt
+          }
+          shards.lowerEntry(offset + 1).getValue
+        }
+      }
+    } else {
+      shards.head._2
+    }
+  }
+
+
+  def acceptedClickHouseSchema(fields: List[String], tableSchema: Map[String, 
String], table: String)
+  : CheckResult = {
+    val nonExistsFields = fields
+      .map(field => (field, tableSchema.contains(field)))
+      .filter { case (_, exist) => !exist }
+
+    if (nonExistsFields.nonEmpty) {
+      CheckResult.error(
+        "field " + nonExistsFields
+          .map(option => "[" + option + "]")
+          .mkString(", ") + " not exist in table " + table)
+    } else {
+      val nonSupportedType = fields
+        .map(field => (tableSchema(field), 
Clickhouse.supportOrNot(tableSchema(field))))
+        .filter { case (_, exist) => !exist }
+      if (nonSupportedType.nonEmpty) {
+        CheckResult.error(
+          "clickHouse data type " + nonSupportedType
+            .map(option => "[" + option + "]")
+            .mkString(", ") + " not support in current version.")
+      } else {
+        CheckResult.success()
+      }
+    }
+  }
+
+  def getClickHouseSchema(
+                           conn: ClickHouseConnectionImpl,
+                           table: String): util.LinkedHashMap[String, String] 
= {
+    val sql = String.format("desc %s", table)
+    val resultSet = conn.createStatement.executeQuery(sql)
+    val schema = new util.LinkedHashMap[String, String]()
+    while (resultSet.next()) {
+      schema.put(resultSet.getString(1), resultSet.getString(2))
+    }
+    schema
+  }
+
+  @tailrec
+  def getDefaultValue(fieldType: String): Object = {
+    fieldType match {
+      case "DateTime" | "Date" | "String" =>
+        Clickhouse.renderStringDefault(fieldType)
+      case Clickhouse.datetime64Pattern(_) =>
+        Clickhouse.renderStringDefault(fieldType)
+      case Clickhouse.datetime64Pattern(_) =>
+        Clickhouse.renderStringDefault(fieldType)
+      case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" | 
"UInt64" |
+           "Int64" | "Float32" | "Float64" =>
+        new Integer(0)
+      case Clickhouse.lowCardinalityPattern(lowCardinalityType) =>
+        getDefaultValue(lowCardinalityType)
+      case Clickhouse.arrayPattern(_) | Clickhouse.nullablePattern(_) =>
+        null
+      case _ => ""
+    }
+  }
+
+
+  def getClickhouseConnection(host: String, database: String, properties: 
Properties): ClickHouseConnectionImpl = {
+    val globalJdbc = String.format("jdbc:clickhouse://%s/%s", host, database)
+    val balanced: BalancedClickhouseDataSource = new 
BalancedClickhouseDataSource(globalJdbc, properties)
+    balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+  }
+
   private[seatunnel] def renderStringDefault(fieldType: String): String = {
     fieldType match {
       case "DateTime" =>
@@ -507,13 +499,12 @@ object Clickhouse {
     }
   }
 
-  class DistributedEngine(val clusterName: String, val database: String,
-                          val table: String) {
+  case class DistributedEngine(clusterName: String, database: String, table: 
String) {
   }
 
-  class Shard(val shardNum: Int, val shardWeight: Int, val replicaNum: Int,
-              val hostname: String, val hostAddress: String, val port: String,
-              val database: String) extends Serializable {
+  case class Shard(shardNum: Int, shardWeight: Int, replicaNum: Int,
+                   hostname: String, hostAddress: String, port: String,
+                   database: String) extends Serializable {
     val jdbc = s"jdbc:clickhouse://$hostAddress:$port/$database"
   }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/ClickhouseFile.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/ClickhouseFile.scala
new file mode 100644
index 0000000..8c5a3a5
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/ClickhouseFile.scala
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import net.jpountz.xxhash.XXHashFactory
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.sink.Clickhouse._
+import org.apache.seatunnel.spark.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX, 
LOGGER, UUID_LENGTH, getClickhouseTableInfo}
+import org.apache.seatunnel.spark.sink.clickhouse.Table
+import org.apache.seatunnel.spark.sink.clickhouse.filetransfer.{FileTransfer, 
ScpFileTransfer}
+import 
org.apache.seatunnel.spark.sink.clickhouse.filetransfer.TransferMethod.{RSYNC, 
SCP, TransferMethod, getCopyMethod}
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.slf4j.LoggerFactory
+import ru.yandex.clickhouse.{BalancedClickhouseDataSource, 
ClickHouseConnectionImpl}
+
+import java.io.File
+import java.util
+import java.util.concurrent.ThreadLocalRandom
+import java.util.{Objects, Properties, UUID}
+import scala.collection.JavaConversions.collectionAsScalaIterable
+import scala.collection.{JavaConversions, mutable}
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
+
+/**
+ * Clickhouse sink use clickhouse-local program. Details see feature
+ * <a 
href="https://github.com/apache/incubator-seatunnel/issues/1382";>ST-1382</a> }
+ */
+class ClickhouseFile extends SparkBatchSink {
+
+  private val properties: Properties = new Properties()
+  private var clickhouseLocalPath: String = _
+  private var table: Table = _
+  private var fields: List[String] = _
+  private var nodePass: Map[String, String] = _
+  private val random = ThreadLocalRandom.current()
+  private var freePass: Boolean = false
+  private var copyFileMethod: TransferMethod = SCP
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+
+    if (!config.hasPath("fields")) {
+      this.fields = data.schema.fieldNames.toList
+    }
+
+    val session = env.getSparkSession
+    import session.implicits._
+    val encoder = Encoders.tuple(
+      ExpressionEncoder[Shard],
+      RowEncoder(data.schema))
+    data.map(item => {
+      val hashInstance = XXHashFactory.fastestInstance().hash64()
+      val shard = getRowShard(distributedEngine.equals(this.table.engine), 
this.table.shards,
+        this.table.shardKey, this.table.shardKeyType, 
this.table.shardWeightCount, this.random,
+        hashInstance, item)
+      (shard, item)
+    })(encoder).groupByKey(si => si._1).mapGroups((shard, rows) => {
+      val paths = generateClickhouseFile(rows)
+      moveFileToServer(shard, paths)
+      attachClickhouseFile(shard, paths)
+      clearLocalFile(paths.head.substring(0, CLICKHOUSE_FILE_PREFIX.length + 
UUID_LENGTH + 1))
+      0
+    }).foreach(_ => {})
+
+  }
+
+  private def generateClickhouseFile(rows: Iterator[(Shard, Row)]): 
List[String] = {
+    val data = rows.map(r => {
+      this.fields.map(f => r._2.getAs[Object](f).toString).mkString("\t")
+    }).mkString("\n")
+
+    def getValue(kv: util.Map.Entry[String, String]): String = {
+      if (this.fields.contains(kv.getKey)) {
+        kv.getKey
+      } else {
+        val v = getDefaultValue(kv.getValue)
+        if (v == null) {
+          "NULL"
+        } else if (v.isInstanceOf[Integer]) {
+          "0"
+        } else {
+          s"'${v.toString}'"
+        }
+      }
+    }
+
+    val uuid = UUID.randomUUID().toString.substring(0, 
UUID_LENGTH).replaceAll("-", "_")
+    val targetPath = java.lang.String.format("%s/%s", CLICKHOUSE_FILE_PREFIX, 
uuid)
+    val target = new File(targetPath)
+    target.mkdirs()
+
+    val exec = mutable.ListBuffer[String]()
+    exec.appendAll(clickhouseLocalPath.trim.split(" "))
+    exec.append("-S")
+    exec.append(fields.map(f => s"$f 
${this.table.tableSchema.get(f)}").mkString(","))
+    exec.append("-N")
+    exec.append("temp_table" + uuid)
+    exec.append("-q")
+    exec.append(java.lang.String.format("%s; INSERT INTO TABLE %s SELECT %s 
FROM temp_table%s;", this.table.getCreateDDLNoDatabase
+      .replaceAll("`", ""), this.table.getLocalTableName,
+      this.table.tableSchema.entrySet.map(getValue).mkString(","), uuid))
+    exec.append("--path")
+    exec.append(targetPath)
+    // TODO change data stream for echo, change it to local file
+    val command = Process(Seq("echo", data)) #| exec
+    LOGGER.info(command.lineStream.mkString("\n"))
+
+    new File(targetPath + "/data/_local/" + 
this.table.getLocalTableName).listFiles().filter(f => f.isDirectory).
+      filterNot(f => f.getName.equals("detached")).map(f => 
f.getAbsolutePath).toList
+  }
+
+  private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {
+
+    var fileTransfer: FileTransfer = null
+    if (this.copyFileMethod == SCP) {
+      var scpFileTransfer: ScpFileTransfer = null
+      if (nodePass.contains(shard.hostAddress)) {
+        scpFileTransfer = new ScpFileTransfer(shard.hostAddress, 
nodePass(shard.hostAddress))
+      } else {
+        scpFileTransfer = new ScpFileTransfer(shard.hostAddress)
+      }
+      scpFileTransfer.init()
+      fileTransfer = scpFileTransfer
+    } else if (this.copyFileMethod == RSYNC) {
+      throw new UnsupportedOperationException(s"not support copy file method: 
'$copyFileMethod' yet")
+    } else {
+      throw new UnsupportedOperationException(s"unknown copy file method: 
'$copyFileMethod', please use " +
+        s"scp/rsync instead")
+    }
+    fileTransfer.transferAndChown(paths, 
s"${this.table.getLocalDataPath(shard).head}detached/")
+
+    fileTransfer.close()
+  }
+
+  private def attachClickhouseFile(shard: Shard, paths: List[String]): Unit = {
+    val balanced: BalancedClickhouseDataSource =
+      new BalancedClickhouseDataSource(
+        
s"jdbc:clickhouse://${shard.hostAddress}:${shard.port}/${shard.database}", 
properties)
+    val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+    paths.map(path => fromPathGetPart(path)).foreach(part => {
+      conn.createStatement().execute(s"ALTER TABLE 
${this.table.getLocalTableName} ATTACH PART '$part'")
+    })
+  }
+
+  private def fromPathGetPart(path: String): String = {
+    path.substring(path.lastIndexOf("/") + 1)
+  }
+
+  private def clearLocalFile(path: String): Unit = {
+    val r = Try(FileUtils.deleteDirectory(new File(path)))
+    r match {
+      case Failure(exception) =>
+        LOGGER.warn(s"delete folder failed, path : $path", exception)
+      case Success(_) =>
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    var checkResult = checkAllExists(config, "host", "table", "database", 
"username", "password",
+      "clickhouse_local_path")
+    if (checkResult.isSuccess) {
+      clickhouseLocalPath = config.getString("clickhouse_local_path")
+      properties.put("user", config.getString("username"))
+      properties.put("password", config.getString("password"))
+      val host = config.getString("host")
+      val database = config.getString("database")
+      val table = config.getString("table")
+      val conn = getClickhouseConnection(host, database, properties)
+
+      if (config.hasPath("copy_method")) {
+        this.copyFileMethod = getCopyMethod(config.getString("copy_method"))
+      }
+
+      val (result, tableInfo) = getClickhouseTableInfo(conn, database, table)
+      if (!Objects.isNull(result)) {
+        checkResult = result
+      } else {
+        this.table = tableInfo
+        tableInfo.initTableInfo(host, conn)
+        tableInfo.initShardDataPath(config.getString("username"), 
config.getString("password"))
+        // check config of node password whether completed or not
+        if (config.hasPath("node_free_password") && 
config.getBoolean("node_free_password")) {
+          this.freePass = true
+        } else if (config.hasPath("node_pass")) {
+          val nodePass = config.getObjectList("node_pass")
+          val nodePassMap = mutable.Map[String, String]()
+          nodePass.foreach(np => {
+            val address = np.toConfig.getString("node_address")
+            val password = np.toConfig.getString("password")
+            nodePassMap(address) = password
+          })
+          this.nodePass = nodePassMap.toMap
+          checkResult = checkNodePass(this.nodePass, 
tableInfo.shards.values().toList)
+        } else {
+          checkResult = CheckResult.error("if clickhouse node is free password 
to spark node, " +
+            "make config 'node_free_password' set true. Otherwise need provide 
clickhouse node password for" +
+            " root user, location at node_pass config.")
+        }
+        if (checkResult.isSuccess) {
+          // check sharding method
+          if (config.hasPath("sharding_key") && 
StringUtils.isNotEmpty(config.getString("sharding_key"))) {
+            this.table.shardKey = config.getString("sharding_key")
+          }
+          checkResult = this.table.prepareShardInfo(conn)
+          if (checkResult.isSuccess) {
+            if (this.config.hasPath("fields")) {
+              this.fields = config.getStringList("fields").toList
+              checkResult = acceptedClickHouseSchema(this.fields, 
JavaConversions.mapAsScalaMap(this.table
+                .tableSchema).toMap, this.table.name)
+            }
+          }
+        }
+      }
+    }
+    checkResult
+  }
+
+  private def checkNodePass(nodePassMap: Map[String, String], shardInfo: 
List[Shard]): CheckResult = {
+    val noPassShard = shardInfo.filter(shard => 
!nodePassMap.contains(shard.hostAddress) &&
+      !nodePassMap.contains(shard.hostname))
+    if (noPassShard.nonEmpty) {
+      CheckResult.error(s"can't find node ${
+        String.join(",", JavaConversions.asJavaIterable(noPassShard.map(s => 
s.hostAddress)))
+      } password in node_address config")
+    } else {
+      CheckResult.success()
+    }
+  }
+
+  override def prepare(prepareEnv: SparkEnvironment): Unit = {
+  }
+}
+
+
+object ClickhouseFile {
+
+  private final val CLICKHOUSE_FILE_PREFIX = "/tmp/clickhouse-local/spark-file"
+  private val LOGGER = LoggerFactory.getLogger(classOf[ClickhouseFile])
+  private val UUID_LENGTH = 10
+  private val OBJECT_MAPPER = new ObjectMapper()
+  OBJECT_MAPPER.registerModule(DefaultScalaModule)
+
+  def getClickhouseTableInfo(conn: ClickHouseConnectionImpl, database: String, 
table: String):
+  (CheckResult, Table) = {
+    val sql = s"select engine,create_table_query,engine_full,data_paths from 
system.tables where database " +
+      s"= '$database' and name = '$table'"
+    val rs = conn.createStatement().executeQuery(sql)
+    if (rs.next()) {
+      (null, new Table(table, database, rs.getString(1), rs.getString(2),
+        rs.getString(3),
+        OBJECT_MAPPER.readValue(rs.getString(4).replaceAll("'", "\""),
+          classOf[util.List[String]]).toList))
+    } else {
+      (CheckResult.error(s"can't find table '$table' in database '$database', 
please check config file"),
+        null)
+    }
+  }
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/Table.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/Table.scala
new file mode 100644
index 0000000..e7c7fd8
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/Table.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.sink.Clickhouse.{Shard, distributedEngine, 
getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, 
getClusterShardList}
+import org.apache.seatunnel.spark.sink.ClickhouseFile.getClickhouseTableInfo
+import ru.yandex.clickhouse.ClickHouseConnectionImpl
+
+import java.util
+import java.util.Properties
+import scala.collection.JavaConversions
+
+class Table(val name: String, val database: String, val engine: String, val 
createTableDDL: String,
+            val engineFull: String, val dataPaths: List[String]) extends 
Serializable {
+
+  var shards = new util.TreeMap[Int, Shard]()
+  private var localTable: String = _
+  var shardWeightCount: Int = 0
+  var shardKey: String = _
+  private var localDataPaths: Map[Shard, List[String]] = _
+  var tableSchema: util.LinkedHashMap[String, String] = new 
util.LinkedHashMap[String, String]()
+  var shardKeyType: String = _
+  var localCreateTableDDL: String = createTableDDL
+
+  def initTableInfo(host: String, conn: ClickHouseConnectionImpl): Unit = {
+    if (shards.size() == 0) {
+      val hostAndPort = host.split(":")
+      if (distributedEngine.equals(this.engine)) {
+        val localTable = getClickHouseDistributedTable(conn, database, name)
+        this.localTable = localTable.table
+        val shardList = getClusterShardList(conn, localTable.clusterName, 
localTable.database, hostAndPort(1))
+        var weight = 0
+        for (elem <- shardList) {
+          this.shards.put(weight, elem)
+          weight += elem.shardWeight
+        }
+        this.shardWeightCount = weight
+        this.localCreateTableDDL = getClickhouseTableInfo(conn, 
localTable.database, localTable.table)._2.createTableDDL
+      } else {
+        this.shards.put(0, Shard(1, 1, 1, hostAndPort(0), hostAndPort(0), 
hostAndPort(1), database))
+      }
+    }
+  }
+
+  def getLocalTableName: String = {
+    if (this.engine.equals(distributedEngine)) {
+      localTable
+    } else {
+      name
+    }
+  }
+
+  def getLocalDataPath(shard: Shard): List[String] = {
+    if (!this.engine.equals(distributedEngine)) {
+      dataPaths
+    } else {
+      localDataPaths(shard)
+    }
+  }
+
+  def initShardDataPath(username: String, password: String): Unit = {
+    val properties: Properties = new Properties()
+    properties.put("user", username)
+    properties.put("password", password)
+    this.localDataPaths = 
JavaConversions.collectionAsScalaIterable(this.shards.values).map(s => {
+      val conn = getClickhouseConnection(s.hostAddress + ":" + s.port, 
s.database, properties)
+      (s, getClickhouseTableInfo(conn, s.database, 
getLocalTableName)._2.dataPaths)
+    }).toMap
+  }
+
+  def getCreateDDLNoDatabase: String = {
+    this.localCreateTableDDL.replace(this.database + ".", "")
+  }
+
+  def prepareShardInfo(conn: ClickHouseConnectionImpl): CheckResult = {
+    this.tableSchema = getClickHouseSchema(conn, name)
+    if (StringUtils.isNotEmpty(this.shardKey)) {
+      if (!this.tableSchema.containsKey(this.shardKey)) {
+        CheckResult.error(
+          s"not find field '${this.shardKey}' in table '${this.name}' as 
sharding key")
+      } else {
+        this.shardKeyType = this.tableSchema.get(this.shardKey)
+        CheckResult.success()
+      }
+    } else {
+      CheckResult.success()
+    }
+  }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/FileTransfer.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/FileTransfer.scala
new file mode 100644
index 0000000..b2443fa
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/FileTransfer.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+abstract class FileTransfer {
+
+  def init(): Unit
+
+  def transferAndChown(sourcePath: String, targetPath: String): Unit
+
+  def transferAndChown(sourcePath: List[String], targetPath: String): Unit
+
+  def close(): Unit
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/RsyncFileTransfer.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/RsyncFileTransfer.scala
new file mode 100644
index 0000000..c92917b
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/RsyncFileTransfer.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+object RsyncFileTransfer extends FileTransfer {
+
+  override def transferAndChown(sourcePath: String, targetPath: String): Unit 
= {
+    throw new UnsupportedOperationException("not support rsync file transfer 
yet")
+  }
+
+  override def init(): Unit = {
+    throw new UnsupportedOperationException("not support rsync file transfer 
yet")
+  }
+
+  override def transferAndChown(sourcePath: List[String], targetPath: String): 
Unit = {
+    throw new UnsupportedOperationException("not support rsync file transfer 
yet")
+  }
+
+  override def close(): Unit = {
+    throw new UnsupportedOperationException("not support rsync file transfer 
yet")
+  }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/ScpFileTransfer.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/ScpFileTransfer.scala
new file mode 100644
index 0000000..37ea3ef
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/ScpFileTransfer.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+import org.apache.sshd.client.SshClient
+import org.apache.sshd.client.session.ClientSession
+import org.apache.sshd.scp.client.{ScpClient, ScpClientCreator}
+
+class ScpFileTransfer(host: String) extends FileTransfer {
+
+  var password: String = _
+
+  def this(host: String, password: String) {
+    this(host)
+    this.password = password
+  }
+
+  private var scpClient: ScpClient = _
+  private var session: ClientSession = _
+  private var client: SshClient = _
+
+  override def transferAndChown(sourcePath: String, targetPath: String): Unit 
= {
+
+    // TODO override Scp to support zero copy
+    scpClient.upload(sourcePath, targetPath, ScpClient.Option.Recursive, 
ScpClient.Option
+      .TargetIsDirectory, ScpClient.Option.PreserveAttributes)
+
+    // remote exec command to change file owner. Only file owner equal with 
server's clickhouse user can
+    // make ATTACH command work.
+    try {
+      session.executeRemoteCommand("ls -l " + targetPath.substring(0, 
targetPath.stripSuffix("/").lastIndexOf("/")) +
+        "/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + 
targetPath)
+    } catch {
+      case e: Exception =>
+      // always return error cause xargs return shell command result
+    }
+  }
+
+  override def transferAndChown(sourcePath: List[String], targetPath: String): 
Unit = {
+    sourcePath.foreach(s => {
+      transferAndChown(s, targetPath)
+    })
+  }
+
+  override def init(): Unit = {
+    client = SshClient.setUpDefaultClient()
+    client.start()
+    session = client.connect("root", this.host, 22).verify().getSession
+    if (password != null) {
+      session.addPasswordIdentity(this.password)
+    }
+    val isSuccess = session.auth.verify.isSuccess
+    if (!isSuccess) {
+      throw new IllegalArgumentException(s"ssh host '$host' verify failed, 
please check your config")
+    }
+
+    scpClient = ScpClientCreator.instance.createScpClient(session)
+
+  }
+
+  override def close(): Unit = {
+    if (session != null && session.isOpen) {
+      session.close()
+    }
+    if (client != null && client.isOpen) {
+      client.stop()
+      client.close()
+    }
+  }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/TransferMethod.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/TransferMethod.scala
new file mode 100644
index 0000000..ab7fe2f
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/TransferMethod.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+object TransferMethod extends Enumeration {
+  type TransferMethod = Value
+  val SCP, RSYNC = Value
+
+  def getCopyMethod(method: String): TransferMethod = {
+    val m = method.toLowerCase
+    if ("scp".equals(m)) {
+      SCP
+    } else if ("rsync".equals(m)) {
+      RSYNC
+    } else {
+      throw new IllegalArgumentException(s"not supported copy method 
'$method'")
+    }
+  }
+
+}
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index 8b362ba..01d1e40 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -261,6 +261,9 @@ The text of each license is the standard Apache 2.0 license.
      (Apache 2) org.roaringbitmap:shims (org.roaringbitmap:shims:0.9.22 - 
https://github.com/RoaringBitmap/RoaringBitmap)
      (Apache 2) scalaj-http (org.scalaj:scalaj-http_2.11:2.3.0 - 
http://github.com/scalaj/scalaj-http)
      (Apache 2) univocity-parsers (com.univocity:univocity-parsers:2.7.3 - 
http://github.com/univocity/univocity-parsers)
+     (Apache 2.0 License) Apache Mina SSHD :: Common support utilities 
(org.apache.sshd:sshd-common:2.7.0 - https://www.apache.org/sshd/sshd-common/)
+     (Apache 2.0 License) Apache Mina SSHD :: Core 
(org.apache.sshd:sshd-core:2.7.0 - https://www.apache.org/sshd/sshd-core/)
+     (Apache 2.0 License) Apache Mina SSHD :: SCP 
(org.apache.sshd:sshd-scp:2.7.0 - https://www.apache.org/sshd/sshd-scp/)
      (Apache 2.0 License) Spark Integration for Kafka 0.10 
(org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 - 
http://spark.apache.org/)
      (Apache 2.0 License) Spark Project Catalyst 
(org.apache.spark:spark-catalyst_2.11:2.4.0 - http://spark.apache.org/)
      (Apache 2.0 License) Spark Project Core 
(org.apache.spark:spark-core_2.11:2.4.0 - http://spark.apache.org/)
@@ -442,6 +445,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Hibernate Validator Engine 
(org.hibernate:hibernate-validator:5.2.5.Final - 
http://hibernate.org/validator/hibernate-validator)
      (Apache License, Version 2.0) Hive Storage API 
(org.apache.hive:hive-storage-api:2.6.0 - 
https://www.apache.org/hive-storage-api/)
      (Apache License, Version 2.0) JCIP Annotations under Apache License 
(com.github.stephenc.jcip:jcip-annotations:1.0-1 - 
http://stephenc.github.com/jcip-annotations)
+     (Apache License, Version 2.0) JCL 1.2 implemented over SLF4J 
(org.slf4j:jcl-over-slf4j:1.7.30 - http://www.slf4j.org)
      (Apache License, Version 2.0) JMES Path Query library 
(com.amazonaws:jmespath-java:1.12.37 - https://aws.amazon.com/sdkforjava)
      (Apache License, Version 2.0) Jettison 
(org.codehaus.jettison:jettison:1.1 - https://github.com/jettison-json/jettison)
      (Apache License, Version 2.0) Jettison 
(org.codehaus.jettison:jettison:1.3.8 - 
https://github.com/jettison-json/jettison)
diff --git a/seatunnel-dist/release-docs/NOTICE 
b/seatunnel-dist/release-docs/NOTICE
index ba9163c..ca2aa9b 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -4151,5 +4151,19 @@ the following copyright notice:
 =========================================================================
 
 
+Apache SSHD NOTICE
+
+=========================================================================
+
+Apache MINA SSHD
+Copyright 2008-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+=========================================================================
+
+
 
 =========================================================================
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 9e0b589..cf1eb33 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -346,6 +346,8 @@ jaxb-impl-2.2.3-1.jar
 jboss-logging-3.2.1.Final.jar
 jcip-annotations-1.0-1.jar
 jcl-over-slf4j-1.7.12.jar
+jcl-over-slf4j-1.7.16.jar
+jcl-over-slf4j-1.7.30.jar
 jcodings-1.0.18.jar
 jcodings-1.0.43.jar
 jcommander-1.81.jar
@@ -608,6 +610,9 @@ spark-unsafe_2.11-2.4.0.jar
 spoiwo_2.11-1.8.0.jar
 spymemcached-2.12.3.jar
 sqlline-1.2.0.jar
+sshd-common-2.7.0.jar
+sshd-core-2.7.0.jar
+sshd-scp-2.7.0.jar
 stax-api-1.0-2.jar
 stax2-api-3.1.4.jar
 stream-2.7.0.jar

Reply via email to