This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d993619  [Feature][Connector] Change ClickhouseFile read temp data 
method to local file (#1567)
d993619 is described below

commit d993619056f88c28b3e805944827bbf49d426c26
Author: TrickyZerg <[email protected]>
AuthorDate: Wed Mar 30 15:46:03 2022 +0800

    [Feature][Connector] Change ClickhouseFile read temp data method to local 
file (#1567)
    
    * change ClickhouseFile read temp data method to local file.
    
    * modify document
---
 docs/en/connector/sink/ClickhouseFile.md           | 39 +++++++++++---------
 .../spark/clickhouse/sink/Clickhouse.scala         |  6 ++--
 .../spark/clickhouse/sink/ClickhouseFile.scala     | 42 +++++++++++++++++++---
 3 files changed, 62 insertions(+), 25 deletions(-)

diff --git a/docs/en/connector/sink/ClickhouseFile.md 
b/docs/en/connector/sink/ClickhouseFile.md
index 211ee2d..6fa7017 100644
--- a/docs/en/connector/sink/ClickhouseFile.md
+++ b/docs/en/connector/sink/ClickhouseFile.md
@@ -16,22 +16,23 @@ Engine Supported and plugin name
 
 ## Options
 
-| name                   | type    | required | default value |
-|------------------------|---------|----------|---------------|
-| database               | string  | yes      | -             |
-| fields                 | array   | no       | -             |
-| host                   | string  | yes      | -             |
-| password               | string  | no       | -             |
-| table                  | string  | yes      | -             |
-| username               | string  | no       | -             |
-| sharding_key           | string  | no       | -             |
-| clickhouse_local_path  | string  | yes      | -             |
-| copy_method            | string  | no       | scp           |
-| node_free_password     | boolean | no       | false         |
-| node_pass              | list    | no       | -             |
-| node_pass.node_address | string  | no       | -             |
-| node_pass.password     | string  | no       | -             |
-| common-options         | string  | no       | -             |
+| name                   | type     | required | default value |
+|------------------------|----------|----------|---------------|
+| database               | string   | yes      | -             |
+| fields                 | array    | no       | -             |
+| host                   | string   | yes      | -             |
+| password               | string   | no       | -             |
+| table                  | string   | yes      | -             |
+| username               | string   | no       | -             |
+| sharding_key           | string   | no       | -             |
+| clickhouse_local_path  | string   | yes      | -             |
+| tmp_batch_cache_line   | int      | no       | 100000        |
+| copy_method            | string   | no       | scp           |
+| node_free_password     | boolean  | no       | false         |
+| node_pass              | list     | no       | -             |
+| node_pass.node_address | string   | no       | -             |
+| node_pass.password     | string   | no       | -             |
+| common-options         | string   | no       | -             |
 
 ### database [string]
 
@@ -68,6 +69,12 @@ worked when 'split_mode' is true.
 The address of the clickhouse-local program on the spark node. Since each task 
needs to be called, 
 clickhouse-local should be located in the same path of each spark node.
 
+### tmp_batch_cache_line [int]
+
+SeaTunnel will use memory map technology to write temporary data to the file 
to cache the data that the 
+user needs to write to clickhouse. This parameter is used to configure the 
number of data pieces written 
+to the file each time. Most of the time you don't need to modify it.
+
 ### copy_method [string]
 
 Specifies the method used to transfer files, the default is scp, optional scp 
and rsync
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 1bd9741..12e5895 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -37,7 +37,7 @@ import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, 
acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, 
getClickHouseSchema, getClickhouseConnection, getClusterShardList, 
getDefaultValue, getRowShard}
 import org.apache.spark.sql.{Dataset, Row}
 import ru.yandex.clickhouse.{BalancedClickhouseDataSource, 
ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
-import ru.yandex.clickhouse.except.{ClickHouseException, 
ClickHouseUnknownException}
+import ru.yandex.clickhouse.except.ClickHouseException
 
 import java.nio.ByteBuffer
 import java.util.concurrent.ThreadLocalRandom
@@ -311,10 +311,8 @@ class Clickhouse extends SparkBatchSink {
         } else {
           throw e
         }
-      case Failure(e: ClickHouseUnknownException) =>
-        statement.close()
-        throw e
       case Failure(e: Exception) =>
+        statement.close()
         throw e
     }
   }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 86e6d0b..7deb045 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory
 import ru.yandex.clickhouse.{BalancedClickhouseDataSource, 
ClickHouseConnectionImpl}
 
 import java.io.File
+import java.nio.channels.FileChannel
+import java.nio.file.{Paths, StandardOpenOption}
 import java.util
 import java.util.concurrent.ThreadLocalRandom
 import java.util.{Objects, Properties, UUID}
@@ -60,6 +62,7 @@ class ClickhouseFile extends SparkBatchSink {
   private val random = ThreadLocalRandom.current()
   private var freePass: Boolean = false
   private var copyFileMethod: TransferMethod = SCP
+  private var tmpBatchCacheLine = 100000
 
   override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
 
@@ -89,9 +92,6 @@ class ClickhouseFile extends SparkBatchSink {
   }
 
   private def generateClickhouseFile(rows: Iterator[(Shard, Row)]): 
List[String] = {
-    val data = rows.map(r => {
-      this.fields.map(f => r._2.getAs[Object](f).toString).mkString("\t")
-    }).mkString("\n")
 
     def getValue(kv: util.Map.Entry[String, String]): String = {
       if (this.fields.contains(kv.getKey)) {
@@ -112,6 +112,9 @@ class ClickhouseFile extends SparkBatchSink {
     val targetPath = java.lang.String.format("%s/%s", CLICKHOUSE_FILE_PREFIX, 
uuid)
     val target = new File(targetPath)
     target.mkdirs()
+    val tmpDataPath = targetPath + "/local_data.log"
+
+    mmapSaveDataSafely(tmpDataPath, rows.map(r => r._2))
 
     val exec = mutable.ListBuffer[String]()
     exec.appendAll(clickhouseLocalPath.trim.split(" "))
@@ -125,14 +128,39 @@ class ClickhouseFile extends SparkBatchSink {
       this.table.tableSchema.entrySet.map(getValue).mkString(","), uuid))
     exec.append("--path")
     exec.append(targetPath)
-    // TODO change data stream for echo, change it to local file
-    val command = Process(Seq("echo", data)) #| exec
+    val command = Process(Seq("less", tmpDataPath)) #| exec
     LOGGER.info(command.lineStream.mkString("\n"))
 
     new File(targetPath + "/data/_local/" + 
this.table.getLocalTableName).listFiles().filter(f => f.isDirectory).
       filterNot(f => f.getName.equals("detached")).map(f => 
f.getAbsolutePath).toList
   }
 
+  private def mmapSaveDataSafely(path: String, rows: Iterator[Row]): Unit = {
+
+    val outputChannel = FileChannel.open(Paths.get(path), 
StandardOpenOption.WRITE, StandardOpenOption.READ,
+      StandardOpenOption.CREATE_NEW)
+    val cache = mutable.ListBuffer[Row]()
+    while (rows.hasNext) {
+      cache.append(rows.next())
+      if (cache.length >= tmpBatchCacheLine) {
+        mmapSaveData(outputChannel, cache.toList)
+        cache.clear()
+      }
+    }
+    if (cache.nonEmpty) {
+      mmapSaveData(outputChannel, cache.toList)
+    }
+    outputChannel.close()
+  }
+
+  private def mmapSaveData(outputChannel: FileChannel, rows: List[Row]): Unit 
= {
+    val data = rows.map(r => {
+      this.fields.map(f => r.getAs[Object](f).toString).mkString("\t") + "\n"
+    }).mkString
+    val buffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 
outputChannel.size(), data.getBytes.length)
+    buffer.put(data.getBytes)
+  }
+
   private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {
 
     var fileTransfer: FileTransfer = null
@@ -195,6 +223,10 @@ class ClickhouseFile extends SparkBatchSink {
         this.copyFileMethod = getCopyMethod(config.getString("copy_method"))
       }
 
+      if (config.hasPath("tmp_batch_cache_line")) {
+        this.tmpBatchCacheLine = config.getInt("tmp_batch_cache_line")
+      }
+
       val (result, tableInfo) = getClickhouseTableInfo(conn, database, table)
       if (!Objects.isNull(result)) {
         checkResult = result

Reply via email to