This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 94864c0 [Feature][Connector] Add clickhouse-file sink support
clickhouse bullk load (#1501)
94864c0 is described below
commit 94864c0772678fc79d1c50c2b22a4459d0e8cc1f
Author: TrickyZerg <[email protected]>
AuthorDate: Thu Mar 24 17:47:24 2022 +0800
[Feature][Connector] Add clickhouse-file sink support clickhouse bullk load
(#1501)
* [Feature#1382][Connector] Add clickhouse-file sink support clickhouse
bulk load
update first piece
---
pom.xml | 14 +-
.../seatunnel-connector-spark-clickhouse/pom.xml | 5 +
.../org.apache.seatunnel.spark.BaseSparkSink | 1 +
.../apache/seatunnel/spark/sink/Clickhouse.scala | 311 ++++++++++-----------
.../seatunnel/spark/sink/ClickhouseFile.scala | 283 +++++++++++++++++++
.../seatunnel/spark/sink/clickhouse/Table.scala | 106 +++++++
.../clickhouse/filetransfer/FileTransfer.scala | 30 ++
.../filetransfer/RsyncFileTransfer.scala | 37 +++
.../clickhouse/filetransfer/ScpFileTransfer.scala | 85 ++++++
.../clickhouse/filetransfer/TransferMethod.scala | 35 +++
seatunnel-dist/release-docs/LICENSE | 4 +
seatunnel-dist/release-docs/NOTICE | 14 +
tools/dependencies/known-dependencies.txt | 5 +
13 files changed, 768 insertions(+), 162 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1ade5e3..7fdb317 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
<junit.version>4.13.2</junit.version>
<tispark.version>2.4.1</tispark.version>
<druid.version>0.22.1</druid.version>
+ <sshd.version>2.7.0</sshd.version>
<calcite-druid.version>1.29.0</calcite-druid.version>
<config.version>1.3.3</config.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
@@ -468,6 +469,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-scp</artifactId>
+ <version>${sshd.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
@@ -605,7 +612,8 @@
<configuration>
<skip>${skipUT}</skip>
<systemPropertyVariables>
-
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
+
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec
+ </jacoco-agent.destfile>
</systemPropertyVariables>
<excludes>
<exclude>**/*IT.java</exclude>
@@ -732,7 +740,9 @@
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<!--suppress UnresolvedMavenProperty -->
-
<configLocation>${maven.multiModuleProjectDirectory}/tools/checkstyle/scalastyle-config.xml</configLocation>
+ <configLocation>
+
${maven.multiModuleProjectDirectory}/tools/checkstyle/scalastyle-config.xml
+ </configLocation>
<outputFile>${project.build.directory}/target/scalastyle-output.xml</outputFile>
<inputEncoding>UTF-8</inputEncoding>
<outputEncoding>UTF-8</outputEncoding>
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
index cf2281d..e5e7c31 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/pom.xml
@@ -50,6 +50,11 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-scp</artifactId>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
index 7245dc7..8512dea 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -16,3 +16,4 @@
#
org.apache.seatunnel.spark.sink.Clickhouse
+org.apache.seatunnel.spark.sink.ClickhouseFile
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index 58da5b6..5e10cc8 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -18,15 +18,15 @@ package org.apache.seatunnel.spark.sink
import net.jpountz.xxhash.{XXHash64, XXHashFactory}
import org.apache.commons.lang3.StringUtils
+
import java.math.{BigDecimal, BigInteger}
import java.sql.{Date, PreparedStatement, Timestamp}
-
import java.text.SimpleDateFormat
import java.util
import java.util.{Objects, Properties}
import scala.collection.JavaConversions._
import scala.collection.immutable.HashMap
-import scala.util.{Failure, Random, Success, Try}
+import scala.util.{Failure, Success, Try}
import scala.util.matching.Regex
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
@@ -34,16 +34,16 @@ import
org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig,
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
-import org.apache.seatunnel.spark.sink.Clickhouse.{DistributedEngine, Shard}
+import org.apache.seatunnel.spark.sink.Clickhouse.{Shard,
acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable,
getClickHouseSchema, getClickhouseConnection, getClusterShardList,
getDefaultValue, getRowShard}
import org.apache.spark.sql.{Dataset, Row}
-import ru.yandex.clickhouse.{BalancedClickhouseDataSource,
ClickHouseConnectionImpl}
+import ru.yandex.clickhouse.{BalancedClickhouseDataSource,
ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
import ru.yandex.clickhouse.except.{ClickHouseException,
ClickHouseUnknownException}
import java.nio.ByteBuffer
+import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
-
import scala.annotation.tailrec
class Clickhouse extends SparkBatchSink {
@@ -59,7 +59,7 @@ class Clickhouse extends SparkBatchSink {
// used for split mode
private var splitMode: Boolean = false
- private val random = new Random()
+ private val random = ThreadLocalRandom.current()
private var shardKey: String = ""
private var shardKeyType: String = _
private var shardTable: String = _
@@ -81,7 +81,7 @@ class Clickhouse extends SparkBatchSink {
val statementMap = this.shards.map(s => {
val executorBalanced = new BalancedClickhouseDataSource(s._2.jdbc,
this.properties)
val executorConn =
executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
- (s._2, executorConn.prepareStatement(this.initSQL))
+ (s._2,
executorConn.prepareStatement(this.initSQL).asInstanceOf[ClickHousePreparedStatementImpl])
}).toMap
// hashInstance cannot be serialized, can only be created in a partition
@@ -89,7 +89,8 @@ class Clickhouse extends SparkBatchSink {
val lengthMap = statementMap.map(s => (s._1, new AtomicLong(0)))
for (item <- iter) {
- val shard = getRowShard(hashInstance, item)
+ val shard = getRowShard(this.splitMode, this.shards, this.shardKey,
this.shardKeyType, this
+ .shardWeightCount, this.random, hashInstance, item)
val statement = statementMap(shard)
renderStatement(fields, item, dfFields, statement)
statement.addBatch()
@@ -124,18 +125,15 @@ class Clickhouse extends SparkBatchSink {
}
val database = config.getString("database")
val host = config.getString("host")
- val globalJdbc = String.format("jdbc:clickhouse://%s/%s", host, database)
- val balanced: BalancedClickhouseDataSource =
- new BalancedClickhouseDataSource(globalJdbc, properties)
- val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+ val conn = getClickhouseConnection(host, database, properties)
val hostAndPort = host.split(":")
this.table = config.getString("table")
- this.tableSchema = getClickHouseSchema(conn, this.table)
+ this.tableSchema = getClickHouseSchema(conn, this.table).toMap
if (splitMode) {
val tableName = config.getString("table")
val localTable = getClickHouseDistributedTable(conn, database,
tableName)
if (Objects.isNull(localTable)) {
- CheckResult.error(s"split mode only support table which engine is
'Distributed' at now")
+ CheckResult.error(s"split mode only support table which engine is
'$distributedEngine' at now")
} else {
this.shardTable = localTable.table
val shardList = getClusterShardList(conn, localTable.clusterName,
localTable.database, hostAndPort(1))
@@ -151,8 +149,7 @@ class Clickhouse extends SparkBatchSink {
}
} else {
// only one connection
- this.shards(0) = new Shard(1, 1, 1, hostAndPort(0),
- hostAndPort(0), hostAndPort(1), database)
+ this.shards(0) = Shard(1, 1, 1, hostAndPort(0), hostAndPort(0),
hostAndPort(1), database)
}
if (StringUtils.isNotEmpty(this.shardKey)) {
@@ -165,7 +162,7 @@ class Clickhouse extends SparkBatchSink {
}
if (this.config.hasPath("fields")) {
this.fields = config.getStringList("fields")
- checkResult = acceptedClickHouseSchema()
+ checkResult = acceptedClickHouseSchema(this.fields.toList,
this.tableSchema, this.table)
}
}
checkResult
@@ -186,78 +183,6 @@ class Clickhouse extends SparkBatchSink {
retryCodes = config.getIntList("retry_codes")
}
- private def getClickHouseSchema(
- conn: ClickHouseConnectionImpl,
- table: String): Map[String, String] = {
- val sql = String.format("desc %s", table)
- val resultSet = conn.createStatement.executeQuery(sql)
- var schema = new HashMap[String, String]()
- while (resultSet.next()) {
- schema += (resultSet.getString(1) -> resultSet.getString(2))
- }
- schema
- }
-
- private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName:
String,
- database: String, port: String):
ListBuffer[Shard] = {
- val rs = conn.createStatement().executeQuery(
- s"select shard_num,shard_weight,replica_num,host_name,host_address," +
- s"port from system.clusters where cluster = '$clusterName'")
- // TODO The port will be used for data communication of the tcp protocol
in the future
- // port is tcp protocol, need http protocol port at now
- val nodeList = mutable.ListBuffer[Shard]()
- while (rs.next()) {
- nodeList += new Shard(rs.getInt(1), rs.getInt(2),
- rs.getInt(3), rs.getString(4), rs.getString(5),
- port, database)
- }
- nodeList
- }
-
- private def getClickHouseDistributedTable(conn: ClickHouseConnectionImpl,
database: String, table: String):
- DistributedEngine = {
- val rs = conn.createStatement().executeQuery(
- s"select engine_full from system.tables where " +
- s"database = '$database' and name = '$table' and engine =
'Distributed'")
- if (rs.next()) {
- // engineFull field will be like : Distributed(cluster, database,
table[, sharding_key[, policy_name]])
- val engineFull = rs.getString(1)
- val infos = engineFull.substring(12).split(",").map(s =>
s.replaceAll("'", ""))
- new DistributedEngine(infos(0), infos(1).trim,
infos(2).replaceAll("\\)", "").trim)
- } else {
- null
- }
- }
-
- private def getRowShard(hashInstance: XXHash64, row: Row): Shard = {
- if (splitMode) {
- if (StringUtils.isEmpty(this.shardKey) ||
row.schema.fieldNames.indexOf(this.shardKey) == -1) {
- this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) +
1).getValue
- } else {
- val fieldIndex = row.fieldIndex(this.shardKey)
- if (row.isNullAt(fieldIndex)) {
- this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) +
1).getValue
- } else {
- var offset = 0
- this.shardKeyType match {
- case Clickhouse.floatPattern(_) =>
- offset = row.getFloat(fieldIndex).toInt % this.shardWeightCount
- case Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
- offset = row.getInt(fieldIndex) % this.shardWeightCount
- case Clickhouse.decimalPattern(_) =>
- offset =
row.getDecimal(fieldIndex).toBigInteger.mod(BigInteger.valueOf(this
- .shardWeightCount)).intValue()
- case _ =>
- offset =
(hashInstance.hash(ByteBuffer.wrap(row.getString(fieldIndex).getBytes), 0) &
Long.MaxValue %
- this.shardWeightCount).toInt
- }
- this.shards.lowerEntry(offset + 1).getValue
- }
- }
- } else {
- this.shards.head._2
- }
- }
private def initPrepareSQL(): String = {
@@ -275,73 +200,11 @@ class Clickhouse extends SparkBatchSink {
sql
}
- private def acceptedClickHouseSchema(): CheckResult = {
- val nonExistsFields = fields
- .map(field => (field, tableSchema.contains(field)))
- .filter { case (_, exist) => !exist }
-
- if (nonExistsFields.nonEmpty) {
- CheckResult.error(
- "field " + nonExistsFields
- .map(option => "[" + option + "]")
- .mkString(", ") + " not exist in table " + this.table)
- } else {
- val nonSupportedType = fields
- .map(field => (tableSchema(field),
Clickhouse.supportOrNot(tableSchema(field))))
- .filter { case (_, exist) => !exist }
- if (nonSupportedType.nonEmpty) {
- CheckResult.error(
- "clickHouse data type " + nonSupportedType
- .map(option => "[" + option + "]")
- .mkString(", ") + " not support in current version.")
- } else {
- CheckResult.success()
- }
- }
- }
-
- @tailrec
private def renderDefaultStatement(
index: Int,
fieldType: String,
- statement: PreparedStatement): Unit = {
- fieldType match {
- case "DateTime" | "Date" | "String" =>
- statement.setString(index + 1,
Clickhouse.renderStringDefault(fieldType))
- case Clickhouse.datetime64Pattern(_) =>
- statement.setString(index + 1,
Clickhouse.renderStringDefault(fieldType))
- case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
- statement.setInt(index + 1, 0)
- case "UInt64" | "Int64" =>
- statement.setLong(index + 1, 0)
- case "Float32" => statement.setFloat(index + 1, 0)
- case "Float64" => statement.setDouble(index + 1, 0)
- case Clickhouse.lowCardinalityPattern(lowCardinalityType) =>
- renderDefaultStatement(index, lowCardinalityType, statement)
- case Clickhouse.arrayPattern(_) => statement.setNull(index,
java.sql.Types.ARRAY)
- case Clickhouse.nullablePattern(nullFieldType) =>
- renderNullStatement(index, nullFieldType, statement)
- case _ => statement.setString(index + 1, "")
- }
- }
-
- private def renderNullStatement(
- index: Int,
- fieldType: String,
- statement: PreparedStatement): Unit = {
- fieldType match {
- case "String" =>
- statement.setNull(index + 1, java.sql.Types.VARCHAR)
- case "DateTime" => statement.setNull(index + 1, java.sql.Types.DATE)
- case Clickhouse.datetime64Pattern(_) => statement.setNull(index + 1,
java.sql.Types.TIMESTAMP)
- case "Date" => statement.setNull(index + 1, java.sql.Types.TIME)
- case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
- statement.setNull(index + 1, java.sql.Types.INTEGER)
- case "UInt64" | "Int64" =>
- statement.setNull(index + 1, java.sql.Types.BIGINT)
- case "Float32" => statement.setNull(index + 1, java.sql.Types.FLOAT)
- case "Float64" => statement.setNull(index + 1, java.sql.Types.DOUBLE)
- }
+ statement:
ClickHousePreparedStatementImpl): Unit = {
+ statement.setObject(index + 1, getDefaultValue(fieldType))
}
private def renderBaseTypeStatement(
@@ -349,7 +212,7 @@ class Clickhouse extends SparkBatchSink {
fieldIndex: Int,
fieldType: String,
item: Row,
- statement: PreparedStatement): Unit = {
+ statement:
ClickHousePreparedStatementImpl): Unit = {
fieldType match {
case "String" =>
statement.setString(index + 1, item.getAs[String](fieldIndex))
@@ -400,7 +263,7 @@ class Clickhouse extends SparkBatchSink {
fields: util.List[String],
item: Row,
dsFields: Array[String],
- statement: PreparedStatement): Unit = {
+ statement: ClickHousePreparedStatementImpl):
Unit = {
for (i <- 0 until fields.size()) {
val field = fields.get(i)
val fieldType = tableSchema(field)
@@ -467,6 +330,7 @@ object Clickhouse {
val floatPattern: Regex = "(Float.*)".r
val decimalPattern: Regex = "(Decimal.*)".r
val datetime64Pattern: Regex = "(DateTime64\\(.*\\))".r
+ val distributedEngine = "Distributed"
/**
* Seatunnel support this clickhouse data type or not.
@@ -491,6 +355,134 @@ object Clickhouse {
}
}
+ def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+ database: String, port: String): ListBuffer[Shard] =
{
+ val rs = conn.createStatement().executeQuery(
+ s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+ s"port from system.clusters where cluster = '$clusterName'")
+ // TODO The port will be used for data communication of the tcp protocol
in the future
+ // port is tcp protocol, need http protocol port at now
+ val nodeList = mutable.ListBuffer[Shard]()
+ while (rs.next()) {
+ nodeList += Shard(rs.getInt(1), rs.getInt(2),
+ rs.getInt(3), rs.getString(4), rs.getString(5),
+ port, database)
+ }
+ nodeList
+ }
+
+ def getClickHouseDistributedTable(conn: ClickHouseConnectionImpl, database:
String, table: String):
+ DistributedEngine = {
+ val rs = conn.createStatement().executeQuery(
+ s"select engine_full from system.tables where " +
+ s"database = '$database' and name = '$table' and engine =
'$distributedEngine'")
+ if (rs.next()) {
+ // engineFull field will be like : Distributed(cluster, database,
table[, sharding_key[, policy_name]])
+ val engineFull = rs.getString(1)
+ val infos = engineFull.substring(12).split(",").map(s =>
s.replaceAll("'", ""))
+ DistributedEngine(infos(0), infos(1).trim, infos(2).replaceAll("\\)",
"").trim)
+ } else {
+ null
+ }
+ }
+
+ def getRowShard(splitMode: Boolean, shards: util.TreeMap[Int, Shard],
shardKey: String, shardKeyType: String,
+ shardWeightCount: Int, random: ThreadLocalRandom,
hashInstance: XXHash64,
+ row: Row): Shard = {
+ if (splitMode) {
+ if (StringUtils.isEmpty(shardKey) ||
row.schema.fieldNames.indexOf(shardKey) == -1) {
+ shards.lowerEntry(random.nextInt(shardWeightCount) + 1).getValue
+ } else {
+ val fieldIndex = row.fieldIndex(shardKey)
+ if (row.isNullAt(fieldIndex)) {
+ shards.lowerEntry(random.nextInt(shardWeightCount) + 1).getValue
+ } else {
+ var offset = 0
+ shardKeyType match {
+ case Clickhouse.floatPattern(_) =>
+ offset = row.getFloat(fieldIndex).toInt % shardWeightCount
+ case Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
+ offset = row.getInt(fieldIndex) % shardWeightCount
+ case Clickhouse.decimalPattern(_) =>
+ offset =
row.getDecimal(fieldIndex).toBigInteger.mod(BigInteger.valueOf(shardWeightCount)).intValue()
+ case _ =>
+ offset =
(hashInstance.hash(ByteBuffer.wrap(row.getString(fieldIndex).getBytes), 0) &
Long.MaxValue %
+ shardWeightCount).toInt
+ }
+ shards.lowerEntry(offset + 1).getValue
+ }
+ }
+ } else {
+ shards.head._2
+ }
+ }
+
+
+ def acceptedClickHouseSchema(fields: List[String], tableSchema: Map[String,
String], table: String)
+ : CheckResult = {
+ val nonExistsFields = fields
+ .map(field => (field, tableSchema.contains(field)))
+ .filter { case (_, exist) => !exist }
+
+ if (nonExistsFields.nonEmpty) {
+ CheckResult.error(
+ "field " + nonExistsFields
+ .map(option => "[" + option + "]")
+ .mkString(", ") + " not exist in table " + table)
+ } else {
+ val nonSupportedType = fields
+ .map(field => (tableSchema(field),
Clickhouse.supportOrNot(tableSchema(field))))
+ .filter { case (_, exist) => !exist }
+ if (nonSupportedType.nonEmpty) {
+ CheckResult.error(
+ "clickHouse data type " + nonSupportedType
+ .map(option => "[" + option + "]")
+ .mkString(", ") + " not support in current version.")
+ } else {
+ CheckResult.success()
+ }
+ }
+ }
+
+ def getClickHouseSchema(
+ conn: ClickHouseConnectionImpl,
+ table: String): util.LinkedHashMap[String, String]
= {
+ val sql = String.format("desc %s", table)
+ val resultSet = conn.createStatement.executeQuery(sql)
+ val schema = new util.LinkedHashMap[String, String]()
+ while (resultSet.next()) {
+ schema.put(resultSet.getString(1), resultSet.getString(2))
+ }
+ schema
+ }
+
+ @tailrec
+ def getDefaultValue(fieldType: String): Object = {
+ fieldType match {
+ case "DateTime" | "Date" | "String" =>
+ Clickhouse.renderStringDefault(fieldType)
+ case Clickhouse.datetime64Pattern(_) =>
+ Clickhouse.renderStringDefault(fieldType)
+ case Clickhouse.datetime64Pattern(_) =>
+ Clickhouse.renderStringDefault(fieldType)
+ case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" |
"UInt64" |
+ "Int64" | "Float32" | "Float64" =>
+ new Integer(0)
+ case Clickhouse.lowCardinalityPattern(lowCardinalityType) =>
+ getDefaultValue(lowCardinalityType)
+ case Clickhouse.arrayPattern(_) | Clickhouse.nullablePattern(_) =>
+ null
+ case _ => ""
+ }
+ }
+
+
+ def getClickhouseConnection(host: String, database: String, properties:
Properties): ClickHouseConnectionImpl = {
+ val globalJdbc = String.format("jdbc:clickhouse://%s/%s", host, database)
+ val balanced: BalancedClickhouseDataSource = new
BalancedClickhouseDataSource(globalJdbc, properties)
+ balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+ }
+
private[seatunnel] def renderStringDefault(fieldType: String): String = {
fieldType match {
case "DateTime" =>
@@ -507,13 +499,12 @@ object Clickhouse {
}
}
- class DistributedEngine(val clusterName: String, val database: String,
- val table: String) {
+ case class DistributedEngine(clusterName: String, database: String, table:
String) {
}
- class Shard(val shardNum: Int, val shardWeight: Int, val replicaNum: Int,
- val hostname: String, val hostAddress: String, val port: String,
- val database: String) extends Serializable {
+ case class Shard(shardNum: Int, shardWeight: Int, replicaNum: Int,
+ hostname: String, hostAddress: String, port: String,
+ database: String) extends Serializable {
val jdbc = s"jdbc:clickhouse://$hostAddress:$port/$database"
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/ClickhouseFile.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/ClickhouseFile.scala
new file mode 100644
index 0000000..8c5a3a5
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/ClickhouseFile.scala
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import net.jpountz.xxhash.XXHashFactory
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.sink.Clickhouse._
+import org.apache.seatunnel.spark.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX,
LOGGER, UUID_LENGTH, getClickhouseTableInfo}
+import org.apache.seatunnel.spark.sink.clickhouse.Table
+import org.apache.seatunnel.spark.sink.clickhouse.filetransfer.{FileTransfer,
ScpFileTransfer}
+import
org.apache.seatunnel.spark.sink.clickhouse.filetransfer.TransferMethod.{RSYNC,
SCP, TransferMethod, getCopyMethod}
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.{Dataset, Encoders, Row}
+import org.slf4j.LoggerFactory
+import ru.yandex.clickhouse.{BalancedClickhouseDataSource,
ClickHouseConnectionImpl}
+
+import java.io.File
+import java.util
+import java.util.concurrent.ThreadLocalRandom
+import java.util.{Objects, Properties, UUID}
+import scala.collection.JavaConversions.collectionAsScalaIterable
+import scala.collection.{JavaConversions, mutable}
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
+
+/**
+ * Clickhouse sink use clickhouse-local program. Details see feature
+ * <a
href="https://github.com/apache/incubator-seatunnel/issues/1382">ST-1382</a> }
+ */
+class ClickhouseFile extends SparkBatchSink {
+
+ private val properties: Properties = new Properties()
+ private var clickhouseLocalPath: String = _
+ private var table: Table = _
+ private var fields: List[String] = _
+ private var nodePass: Map[String, String] = _
+ private val random = ThreadLocalRandom.current()
+ private var freePass: Boolean = false
+ private var copyFileMethod: TransferMethod = SCP
+
+ override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+
+ if (!config.hasPath("fields")) {
+ this.fields = data.schema.fieldNames.toList
+ }
+
+ val session = env.getSparkSession
+ import session.implicits._
+ val encoder = Encoders.tuple(
+ ExpressionEncoder[Shard],
+ RowEncoder(data.schema))
+ data.map(item => {
+ val hashInstance = XXHashFactory.fastestInstance().hash64()
+ val shard = getRowShard(distributedEngine.equals(this.table.engine),
this.table.shards,
+ this.table.shardKey, this.table.shardKeyType,
this.table.shardWeightCount, this.random,
+ hashInstance, item)
+ (shard, item)
+ })(encoder).groupByKey(si => si._1).mapGroups((shard, rows) => {
+ val paths = generateClickhouseFile(rows)
+ moveFileToServer(shard, paths)
+ attachClickhouseFile(shard, paths)
+ clearLocalFile(paths.head.substring(0, CLICKHOUSE_FILE_PREFIX.length +
UUID_LENGTH + 1))
+ 0
+ }).foreach(_ => {})
+
+ }
+
+ private def generateClickhouseFile(rows: Iterator[(Shard, Row)]):
List[String] = {
+ val data = rows.map(r => {
+ this.fields.map(f => r._2.getAs[Object](f).toString).mkString("\t")
+ }).mkString("\n")
+
+ def getValue(kv: util.Map.Entry[String, String]): String = {
+ if (this.fields.contains(kv.getKey)) {
+ kv.getKey
+ } else {
+ val v = getDefaultValue(kv.getValue)
+ if (v == null) {
+ "NULL"
+ } else if (v.isInstanceOf[Integer]) {
+ "0"
+ } else {
+ s"'${v.toString}'"
+ }
+ }
+ }
+
+ val uuid = UUID.randomUUID().toString.substring(0,
UUID_LENGTH).replaceAll("-", "_")
+ val targetPath = java.lang.String.format("%s/%s", CLICKHOUSE_FILE_PREFIX,
uuid)
+ val target = new File(targetPath)
+ target.mkdirs()
+
+ val exec = mutable.ListBuffer[String]()
+ exec.appendAll(clickhouseLocalPath.trim.split(" "))
+ exec.append("-S")
+ exec.append(fields.map(f => s"$f
${this.table.tableSchema.get(f)}").mkString(","))
+ exec.append("-N")
+ exec.append("temp_table" + uuid)
+ exec.append("-q")
+ exec.append(java.lang.String.format("%s; INSERT INTO TABLE %s SELECT %s
FROM temp_table%s;", this.table.getCreateDDLNoDatabase
+ .replaceAll("`", ""), this.table.getLocalTableName,
+ this.table.tableSchema.entrySet.map(getValue).mkString(","), uuid))
+ exec.append("--path")
+ exec.append(targetPath)
+ // TODO change data stream for echo, change it to local file
+ val command = Process(Seq("echo", data)) #| exec
+ LOGGER.info(command.lineStream.mkString("\n"))
+
+ new File(targetPath + "/data/_local/" +
this.table.getLocalTableName).listFiles().filter(f => f.isDirectory).
+ filterNot(f => f.getName.equals("detached")).map(f =>
f.getAbsolutePath).toList
+ }
+
+ private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {
+
+ var fileTransfer: FileTransfer = null
+ if (this.copyFileMethod == SCP) {
+ var scpFileTransfer: ScpFileTransfer = null
+ if (nodePass.contains(shard.hostAddress)) {
+ scpFileTransfer = new ScpFileTransfer(shard.hostAddress,
nodePass(shard.hostAddress))
+ } else {
+ scpFileTransfer = new ScpFileTransfer(shard.hostAddress)
+ }
+ scpFileTransfer.init()
+ fileTransfer = scpFileTransfer
+ } else if (this.copyFileMethod == RSYNC) {
+ throw new UnsupportedOperationException(s"not support copy file method:
'$copyFileMethod' yet")
+ } else {
+ throw new UnsupportedOperationException(s"unknown copy file method:
'$copyFileMethod', please use " +
+ s"scp/rsync instead")
+ }
+ fileTransfer.transferAndChown(paths,
s"${this.table.getLocalDataPath(shard).head}detached/")
+
+ fileTransfer.close()
+ }
+
+ private def attachClickhouseFile(shard: Shard, paths: List[String]): Unit = {
+ val balanced: BalancedClickhouseDataSource =
+ new BalancedClickhouseDataSource(
+
s"jdbc:clickhouse://${shard.hostAddress}:${shard.port}/${shard.database}",
properties)
+ val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+ paths.map(path => fromPathGetPart(path)).foreach(part => {
+ conn.createStatement().execute(s"ALTER TABLE
${this.table.getLocalTableName} ATTACH PART '$part'")
+ })
+ }
+
+ private def fromPathGetPart(path: String): String = {
+ path.substring(path.lastIndexOf("/") + 1)
+ }
+
+ private def clearLocalFile(path: String): Unit = {
+ val r = Try(FileUtils.deleteDirectory(new File(path)))
+ r match {
+ case Failure(exception) =>
+ LOGGER.warn(s"delete folder failed, path : $path", exception)
+ case Success(_) =>
+ }
+ }
+
+ override def checkConfig(): CheckResult = {
+ var checkResult = checkAllExists(config, "host", "table", "database",
"username", "password",
+ "clickhouse_local_path")
+ if (checkResult.isSuccess) {
+ clickhouseLocalPath = config.getString("clickhouse_local_path")
+ properties.put("user", config.getString("username"))
+ properties.put("password", config.getString("password"))
+ val host = config.getString("host")
+ val database = config.getString("database")
+ val table = config.getString("table")
+ val conn = getClickhouseConnection(host, database, properties)
+
+ if (config.hasPath("copy_method")) {
+ this.copyFileMethod = getCopyMethod(config.getString("copy_method"))
+ }
+
+ val (result, tableInfo) = getClickhouseTableInfo(conn, database, table)
+ if (!Objects.isNull(result)) {
+ checkResult = result
+ } else {
+ this.table = tableInfo
+ tableInfo.initTableInfo(host, conn)
+ tableInfo.initShardDataPath(config.getString("username"),
config.getString("password"))
+ // check config of node password whether completed or not
+ if (config.hasPath("node_free_password") &&
config.getBoolean("node_free_password")) {
+ this.freePass = true
+ } else if (config.hasPath("node_pass")) {
+ val nodePass = config.getObjectList("node_pass")
+ val nodePassMap = mutable.Map[String, String]()
+ nodePass.foreach(np => {
+ val address = np.toConfig.getString("node_address")
+ val password = np.toConfig.getString("password")
+ nodePassMap(address) = password
+ })
+ this.nodePass = nodePassMap.toMap
+ checkResult = checkNodePass(this.nodePass,
tableInfo.shards.values().toList)
+ } else {
+ checkResult = CheckResult.error("if clickhouse node is free password
to spark node, " +
+ "make config 'node_free_password' set true. Otherwise need provide
clickhouse node password for" +
+ " root user, location at node_pass config.")
+ }
+ if (checkResult.isSuccess) {
+ // check sharding method
+ if (config.hasPath("sharding_key") &&
StringUtils.isNotEmpty(config.getString("sharding_key"))) {
+ this.table.shardKey = config.getString("sharding_key")
+ }
+ checkResult = this.table.prepareShardInfo(conn)
+ if (checkResult.isSuccess) {
+ if (this.config.hasPath("fields")) {
+ this.fields = config.getStringList("fields").toList
+ checkResult = acceptedClickHouseSchema(this.fields,
JavaConversions.mapAsScalaMap(this.table
+ .tableSchema).toMap, this.table.name)
+ }
+ }
+ }
+ }
+ }
+ checkResult
+ }
+
+ private def checkNodePass(nodePassMap: Map[String, String], shardInfo:
List[Shard]): CheckResult = {
+ val noPassShard = shardInfo.filter(shard =>
!nodePassMap.contains(shard.hostAddress) &&
+ !nodePassMap.contains(shard.hostname))
+ if (noPassShard.nonEmpty) {
+ CheckResult.error(s"can't find node ${
+ String.join(",", JavaConversions.asJavaIterable(noPassShard.map(s =>
s.hostAddress)))
+ } password in node_address config")
+ } else {
+ CheckResult.success()
+ }
+ }
+
+ override def prepare(prepareEnv: SparkEnvironment): Unit = {
+ }
+}
+
+
+object ClickhouseFile {
+
+ private final val CLICKHOUSE_FILE_PREFIX = "/tmp/clickhouse-local/spark-file"
+ private val LOGGER = LoggerFactory.getLogger(classOf[ClickhouseFile])
+ private val UUID_LENGTH = 10
+ private val OBJECT_MAPPER = new ObjectMapper()
+ OBJECT_MAPPER.registerModule(DefaultScalaModule)
+
+ def getClickhouseTableInfo(conn: ClickHouseConnectionImpl, database: String,
table: String):
+ (CheckResult, Table) = {
+ val sql = s"select engine,create_table_query,engine_full,data_paths from
system.tables where database " +
+ s"= '$database' and name = '$table'"
+ val rs = conn.createStatement().executeQuery(sql)
+ if (rs.next()) {
+ (null, new Table(table, database, rs.getString(1), rs.getString(2),
+ rs.getString(3),
+ OBJECT_MAPPER.readValue(rs.getString(4).replaceAll("'", "\""),
+ classOf[util.List[String]]).toList))
+ } else {
+ (CheckResult.error(s"can't find table '$table' in database '$database',
please check config file"),
+ null)
+ }
+ }
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/Table.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/Table.scala
new file mode 100644
index 0000000..e7c7fd8
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/Table.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.sink.Clickhouse.{Shard, distributedEngine,
getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection,
getClusterShardList}
+import org.apache.seatunnel.spark.sink.ClickhouseFile.getClickhouseTableInfo
+import ru.yandex.clickhouse.ClickHouseConnectionImpl
+
+import java.util
+import java.util.Properties
+import scala.collection.JavaConversions
+
+class Table(val name: String, val database: String, val engine: String, val
createTableDDL: String,
+ val engineFull: String, val dataPaths: List[String]) extends
Serializable {
+
+ var shards = new util.TreeMap[Int, Shard]()
+ private var localTable: String = _
+ var shardWeightCount: Int = 0
+ var shardKey: String = _
+ private var localDataPaths: Map[Shard, List[String]] = _
+ var tableSchema: util.LinkedHashMap[String, String] = new
util.LinkedHashMap[String, String]()
+ var shardKeyType: String = _
+ var localCreateTableDDL: String = createTableDDL
+
+ def initTableInfo(host: String, conn: ClickHouseConnectionImpl): Unit = {
+ if (shards.size() == 0) {
+ val hostAndPort = host.split(":")
+ if (distributedEngine.equals(this.engine)) {
+ val localTable = getClickHouseDistributedTable(conn, database, name)
+ this.localTable = localTable.table
+ val shardList = getClusterShardList(conn, localTable.clusterName,
localTable.database, hostAndPort(1))
+ var weight = 0
+ for (elem <- shardList) {
+ this.shards.put(weight, elem)
+ weight += elem.shardWeight
+ }
+ this.shardWeightCount = weight
+ this.localCreateTableDDL = getClickhouseTableInfo(conn,
localTable.database, localTable.table)._2.createTableDDL
+ } else {
+ this.shards.put(0, Shard(1, 1, 1, hostAndPort(0), hostAndPort(0),
hostAndPort(1), database))
+ }
+ }
+ }
+
+ def getLocalTableName: String = {
+ if (this.engine.equals(distributedEngine)) {
+ localTable
+ } else {
+ name
+ }
+ }
+
+ def getLocalDataPath(shard: Shard): List[String] = {
+ if (!this.engine.equals(distributedEngine)) {
+ dataPaths
+ } else {
+ localDataPaths(shard)
+ }
+ }
+
+ def initShardDataPath(username: String, password: String): Unit = {
+ val properties: Properties = new Properties()
+ properties.put("user", username)
+ properties.put("password", password)
+ this.localDataPaths =
JavaConversions.collectionAsScalaIterable(this.shards.values).map(s => {
+ val conn = getClickhouseConnection(s.hostAddress + ":" + s.port,
s.database, properties)
+ (s, getClickhouseTableInfo(conn, s.database,
getLocalTableName)._2.dataPaths)
+ }).toMap
+ }
+
+ def getCreateDDLNoDatabase: String = {
+ this.localCreateTableDDL.replace(this.database + ".", "")
+ }
+
+ def prepareShardInfo(conn: ClickHouseConnectionImpl): CheckResult = {
+ this.tableSchema = getClickHouseSchema(conn, name)
+ if (StringUtils.isNotEmpty(this.shardKey)) {
+ if (!this.tableSchema.containsKey(this.shardKey)) {
+ CheckResult.error(
+ s"not find field '${this.shardKey}' in table '${this.name}' as
sharding key")
+ } else {
+ this.shardKeyType = this.tableSchema.get(this.shardKey)
+ CheckResult.success()
+ }
+ } else {
+ CheckResult.success()
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/FileTransfer.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/FileTransfer.scala
new file mode 100644
index 0000000..b2443fa
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/FileTransfer.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+abstract class FileTransfer {
+
+ def init(): Unit
+
+ def transferAndChown(sourcePath: String, targetPath: String): Unit
+
+ def transferAndChown(sourcePath: List[String], targetPath: String): Unit
+
+ def close(): Unit
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/RsyncFileTransfer.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/RsyncFileTransfer.scala
new file mode 100644
index 0000000..c92917b
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/RsyncFileTransfer.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+object RsyncFileTransfer extends FileTransfer {
+
+ override def transferAndChown(sourcePath: String, targetPath: String): Unit
= {
+ throw new UnsupportedOperationException("not support rsync file transfer
yet")
+ }
+
+ override def init(): Unit = {
+ throw new UnsupportedOperationException("not support rsync file transfer
yet")
+ }
+
+ override def transferAndChown(sourcePath: List[String], targetPath: String):
Unit = {
+ throw new UnsupportedOperationException("not support rsync file transfer
yet")
+ }
+
+ override def close(): Unit = {
+ throw new UnsupportedOperationException("not support rsync file transfer
yet")
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/ScpFileTransfer.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/ScpFileTransfer.scala
new file mode 100644
index 0000000..37ea3ef
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/ScpFileTransfer.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+import org.apache.sshd.client.SshClient
+import org.apache.sshd.client.session.ClientSession
+import org.apache.sshd.scp.client.{ScpClient, ScpClientCreator}
+
+class ScpFileTransfer(host: String) extends FileTransfer {
+
+ var password: String = _
+
+ def this(host: String, password: String) {
+ this(host)
+ this.password = password
+ }
+
+ private var scpClient: ScpClient = _
+ private var session: ClientSession = _
+ private var client: SshClient = _
+
+ override def transferAndChown(sourcePath: String, targetPath: String): Unit
= {
+
+ // TODO override Scp to support zero copy
+ scpClient.upload(sourcePath, targetPath, ScpClient.Option.Recursive,
ScpClient.Option
+ .TargetIsDirectory, ScpClient.Option.PreserveAttributes)
+
+ // remote exec command to change file owner. Only file owner equal with
server's clickhouse user can
+ // make ATTACH command work.
+ try {
+ session.executeRemoteCommand("ls -l " + targetPath.substring(0,
targetPath.stripSuffix("/").lastIndexOf("/")) +
+ "/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " +
targetPath)
+ } catch {
+ case e: Exception =>
+ // always return error cause xargs return shell command result
+ }
+ }
+
+ override def transferAndChown(sourcePath: List[String], targetPath: String):
Unit = {
+ sourcePath.foreach(s => {
+ transferAndChown(s, targetPath)
+ })
+ }
+
+ override def init(): Unit = {
+ client = SshClient.setUpDefaultClient()
+ client.start()
+ session = client.connect("root", this.host, 22).verify().getSession
+ if (password != null) {
+ session.addPasswordIdentity(this.password)
+ }
+ val isSuccess = session.auth.verify.isSuccess
+ if (!isSuccess) {
+ throw new IllegalArgumentException(s"ssh host '$host' verify failed,
please check your config")
+ }
+
+ scpClient = ScpClientCreator.instance.createScpClient(session)
+
+ }
+
+ override def close(): Unit = {
+ if (session != null && session.isOpen) {
+ session.close()
+ }
+ if (client != null && client.isOpen) {
+ client.stop()
+ client.close()
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/TransferMethod.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/TransferMethod.scala
new file mode 100644
index 0000000..ab7fe2f
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/clickhouse/filetransfer/TransferMethod.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.sink.clickhouse.filetransfer
+
+object TransferMethod extends Enumeration {
+ type TransferMethod = Value
+ val SCP, RSYNC = Value
+
+ def getCopyMethod(method: String): TransferMethod = {
+ val m = method.toLowerCase
+ if ("scp".equals(m)) {
+ SCP
+ } else if ("rsync".equals(m)) {
+ RSYNC
+ } else {
+ throw new IllegalArgumentException(s"not supported copy method
'$method'")
+ }
+ }
+
+}
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 8b362ba..01d1e40 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -261,6 +261,9 @@ The text of each license is the standard Apache 2.0 license.
(Apache 2) org.roaringbitmap:shims (org.roaringbitmap:shims:0.9.22 -
https://github.com/RoaringBitmap/RoaringBitmap)
(Apache 2) scalaj-http (org.scalaj:scalaj-http_2.11:2.3.0 -
http://github.com/scalaj/scalaj-http)
(Apache 2) univocity-parsers (com.univocity:univocity-parsers:2.7.3 -
http://github.com/univocity/univocity-parsers)
+ (Apache 2.0 License) Apache Mina SSHD :: Common support utilities
(org.apache.sshd:sshd-common:2.7.0 - https://www.apache.org/sshd/sshd-common/)
+ (Apache 2.0 License) Apache Mina SSHD :: Core
(org.apache.sshd:sshd-core:2.7.0 - https://www.apache.org/sshd/sshd-core/)
+ (Apache 2.0 License) Apache Mina SSHD :: SCP
(org.apache.sshd:sshd-scp:2.7.0 - https://www.apache.org/sshd/sshd-scp/)
(Apache 2.0 License) Spark Integration for Kafka 0.10
(org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 -
http://spark.apache.org/)
(Apache 2.0 License) Spark Project Catalyst
(org.apache.spark:spark-catalyst_2.11:2.4.0 - http://spark.apache.org/)
(Apache 2.0 License) Spark Project Core
(org.apache.spark:spark-core_2.11:2.4.0 - http://spark.apache.org/)
@@ -442,6 +445,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Hibernate Validator Engine
(org.hibernate:hibernate-validator:5.2.5.Final -
http://hibernate.org/validator/hibernate-validator)
(Apache License, Version 2.0) Hive Storage API
(org.apache.hive:hive-storage-api:2.6.0 -
https://www.apache.org/hive-storage-api/)
(Apache License, Version 2.0) JCIP Annotations under Apache License
(com.github.stephenc.jcip:jcip-annotations:1.0-1 -
http://stephenc.github.com/jcip-annotations)
+ (Apache License, Version 2.0) JCL 1.2 implemented over SLF4J
(org.slf4j:jcl-over-slf4j:1.7.30 - http://www.slf4j.org)
(Apache License, Version 2.0) JMES Path Query library
(com.amazonaws:jmespath-java:1.12.37 - https://aws.amazon.com/sdkforjava)
(Apache License, Version 2.0) Jettison
(org.codehaus.jettison:jettison:1.1 - https://github.com/jettison-json/jettison)
(Apache License, Version 2.0) Jettison
(org.codehaus.jettison:jettison:1.3.8 -
https://github.com/jettison-json/jettison)
diff --git a/seatunnel-dist/release-docs/NOTICE
b/seatunnel-dist/release-docs/NOTICE
index ba9163c..ca2aa9b 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -4151,5 +4151,19 @@ the following copyright notice:
=========================================================================
+Apache SSHD NOTICE
+
+=========================================================================
+
+Apache MINA SSHD
+Copyright 2008-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+=========================================================================
+
+
=========================================================================
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 9e0b589..cf1eb33 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -346,6 +346,8 @@ jaxb-impl-2.2.3-1.jar
jboss-logging-3.2.1.Final.jar
jcip-annotations-1.0-1.jar
jcl-over-slf4j-1.7.12.jar
+jcl-over-slf4j-1.7.16.jar
+jcl-over-slf4j-1.7.30.jar
jcodings-1.0.18.jar
jcodings-1.0.43.jar
jcommander-1.81.jar
@@ -608,6 +610,9 @@ spark-unsafe_2.11-2.4.0.jar
spoiwo_2.11-1.8.0.jar
spymemcached-2.12.3.jar
sqlline-1.2.0.jar
+sshd-common-2.7.0.jar
+sshd-core-2.7.0.jar
+sshd-scp-2.7.0.jar
stax-api-1.0-2.jar
stax2-api-3.1.4.jar
stream-2.7.0.jar