This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d993619 [Feature][Connector] Change ClickhouseFile read temp data
method to local file (#1567)
d993619 is described below
commit d993619056f88c28b3e805944827bbf49d426c26
Author: TrickyZerg <[email protected]>
AuthorDate: Wed Mar 30 15:46:03 2022 +0800
[Feature][Connector] Change ClickhouseFile read temp data method to local
file (#1567)
* change ClickhouseFile read temp data method to local file.
* modify document
---
docs/en/connector/sink/ClickhouseFile.md | 39 +++++++++++---------
.../spark/clickhouse/sink/Clickhouse.scala | 6 ++--
.../spark/clickhouse/sink/ClickhouseFile.scala | 42 +++++++++++++++++++---
3 files changed, 62 insertions(+), 25 deletions(-)
diff --git a/docs/en/connector/sink/ClickhouseFile.md
b/docs/en/connector/sink/ClickhouseFile.md
index 211ee2d..6fa7017 100644
--- a/docs/en/connector/sink/ClickhouseFile.md
+++ b/docs/en/connector/sink/ClickhouseFile.md
@@ -16,22 +16,23 @@ Engine Supported and plugin name
## Options
-| name | type | required | default value |
-|------------------------|---------|----------|---------------|
-| database | string | yes | - |
-| fields | array | no | - |
-| host | string | yes | - |
-| password | string | no | - |
-| table | string | yes | - |
-| username | string | no | - |
-| sharding_key | string | no | - |
-| clickhouse_local_path | string | yes | - |
-| copy_method | string | no | scp |
-| node_free_password | boolean | no | false |
-| node_pass | list | no | - |
-| node_pass.node_address | string | no | - |
-| node_pass.password | string | no | - |
-| common-options | string | no | - |
+| name | type | required | default value |
+|------------------------|----------|----------|---------------|
+| database | string | yes | - |
+| fields | array | no | - |
+| host | string | yes | - |
+| password | string | no | - |
+| table | string | yes | - |
+| username | string | no | - |
+| sharding_key | string | no | - |
+| clickhouse_local_path | string | yes | - |
+| tmp_batch_cache_line | int | no | 100000 |
+| copy_method | string | no | scp |
+| node_free_password | boolean | no | false |
+| node_pass | list | no | - |
+| node_pass.node_address | string | no | - |
+| node_pass.password | string | no | - |
+| common-options | string | no | - |
### database [string]
@@ -68,6 +69,12 @@ worked when 'split_mode' is true.
The address of the clickhouse-local program on the spark node. Since each task
needs to be called,
clickhouse-local should be located in the same path of each spark node.
+### tmp_batch_cache_line [int]
+
+SeaTunnel will use memory map technology to write temporary data to the file
to cache the data that the
+user needs to write to clickhouse. This parameter is used to configure the
number of data pieces written
+to the file each time. Most of the time you don't need to modify it.
+
### copy_method [string]
Specifies the method used to transfer files, the default is scp, optional scp
and rsync
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 1bd9741..12e5895 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -37,7 +37,7 @@ import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard,
acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable,
getClickHouseSchema, getClickhouseConnection, getClusterShardList,
getDefaultValue, getRowShard}
import org.apache.spark.sql.{Dataset, Row}
import ru.yandex.clickhouse.{BalancedClickhouseDataSource,
ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
-import ru.yandex.clickhouse.except.{ClickHouseException,
ClickHouseUnknownException}
+import ru.yandex.clickhouse.except.ClickHouseException
import java.nio.ByteBuffer
import java.util.concurrent.ThreadLocalRandom
@@ -311,10 +311,8 @@ class Clickhouse extends SparkBatchSink {
} else {
throw e
}
- case Failure(e: ClickHouseUnknownException) =>
- statement.close()
- throw e
case Failure(e: Exception) =>
+ statement.close()
throw e
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 86e6d0b..7deb045 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{BalancedClickhouseDataSource,
ClickHouseConnectionImpl}
import java.io.File
+import java.nio.channels.FileChannel
+import java.nio.file.{Paths, StandardOpenOption}
import java.util
import java.util.concurrent.ThreadLocalRandom
import java.util.{Objects, Properties, UUID}
@@ -60,6 +62,7 @@ class ClickhouseFile extends SparkBatchSink {
private val random = ThreadLocalRandom.current()
private var freePass: Boolean = false
private var copyFileMethod: TransferMethod = SCP
+ private var tmpBatchCacheLine = 100000
override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
@@ -89,9 +92,6 @@ class ClickhouseFile extends SparkBatchSink {
}
private def generateClickhouseFile(rows: Iterator[(Shard, Row)]):
List[String] = {
- val data = rows.map(r => {
- this.fields.map(f => r._2.getAs[Object](f).toString).mkString("\t")
- }).mkString("\n")
def getValue(kv: util.Map.Entry[String, String]): String = {
if (this.fields.contains(kv.getKey)) {
@@ -112,6 +112,9 @@ class ClickhouseFile extends SparkBatchSink {
val targetPath = java.lang.String.format("%s/%s", CLICKHOUSE_FILE_PREFIX,
uuid)
val target = new File(targetPath)
target.mkdirs()
+ val tmpDataPath = targetPath + "/local_data.log"
+
+ mmapSaveDataSafely(tmpDataPath, rows.map(r => r._2))
val exec = mutable.ListBuffer[String]()
exec.appendAll(clickhouseLocalPath.trim.split(" "))
@@ -125,14 +128,39 @@ class ClickhouseFile extends SparkBatchSink {
this.table.tableSchema.entrySet.map(getValue).mkString(","), uuid))
exec.append("--path")
exec.append(targetPath)
- // TODO change data stream for echo, change it to local file
- val command = Process(Seq("echo", data)) #| exec
+ val command = Process(Seq("less", tmpDataPath)) #| exec
LOGGER.info(command.lineStream.mkString("\n"))
new File(targetPath + "/data/_local/" +
this.table.getLocalTableName).listFiles().filter(f => f.isDirectory).
filterNot(f => f.getName.equals("detached")).map(f =>
f.getAbsolutePath).toList
}
+ private def mmapSaveDataSafely(path: String, rows: Iterator[Row]): Unit = {
+
+ val outputChannel = FileChannel.open(Paths.get(path),
StandardOpenOption.WRITE, StandardOpenOption.READ,
+ StandardOpenOption.CREATE_NEW)
+ val cache = mutable.ListBuffer[Row]()
+ while (rows.hasNext) {
+ cache.append(rows.next())
+ if (cache.length >= tmpBatchCacheLine) {
+ mmapSaveData(outputChannel, cache.toList)
+ cache.clear()
+ }
+ }
+ if (cache.nonEmpty) {
+ mmapSaveData(outputChannel, cache.toList)
+ }
+ outputChannel.close()
+ }
+
+ private def mmapSaveData(outputChannel: FileChannel, rows: List[Row]): Unit
= {
+ val data = rows.map(r => {
+ this.fields.map(f => r.getAs[Object](f).toString).mkString("\t") + "\n"
+ }).mkString
+ val buffer = outputChannel.map(FileChannel.MapMode.READ_WRITE,
outputChannel.size(), data.getBytes.length)
+ buffer.put(data.getBytes)
+ }
+
private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {
var fileTransfer: FileTransfer = null
@@ -195,6 +223,10 @@ class ClickhouseFile extends SparkBatchSink {
this.copyFileMethod = getCopyMethod(config.getString("copy_method"))
}
+ if (config.hasPath("tmp_batch_cache_line")) {
+ this.tmpBatchCacheLine = config.getInt("tmp_batch_cache_line")
+ }
+
val (result, tableInfo) = getClickhouseTableInfo(conn, database, table)
if (!Objects.isNull(result)) {
checkResult = result