This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cf39e29dad [Improve] Speed up ClickhouseFile Local generate a mmap  
object (#5822)
cf39e29dad is described below

commit cf39e29dad56e053052f51bd77faf8807f5ac281
Author: luo <[email protected]>
AuthorDate: Thu Nov 16 10:58:59 2023 +0800

    [Improve] Speed up ClickhouseFile Local generate a mmap  object (#5822)
---
 .../sink/file/ClickhouseFileSinkWriter.java        | 34 +++++++++++++++++-----
 1 file changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 8bfa9fcb34..ac7f5d1401 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -73,6 +73,8 @@ public class ClickhouseFileSinkWriter
     private final ClickhouseTable clickhouseTable;
     private final Map<Shard, List<String>> shardLocalDataPaths;
     private final Map<Shard, FileChannel> rowCache;
+    private final Map<Shard, MappedByteBuffer> bufferCache;
+    private final Integer bufferSize = 1024 * 128;
 
     private final Map<Shard, String> shardTempFile;
 
@@ -91,6 +93,7 @@ public class ClickhouseFileSinkWriter
                         this.readerOption.getShardMetadata().getDatabase(),
                         this.readerOption.getShardMetadata().getTable());
         rowCache = new HashMap<>(Common.COLLECTION_SIZE);
+        bufferCache = new HashMap<>(Common.COLLECTION_SIZE);
         shardTempFile = new HashMap<>();
         nodePasswordCheck();
 
@@ -141,7 +144,7 @@ public class ClickhouseFileSinkWriter
                                         e);
                             }
                         });
-        saveDataToFile(channel, element);
+        saveDataToFile(channel, element, shard);
     }
 
     private void nodePasswordCheck() {
@@ -209,7 +212,8 @@ public class ClickhouseFileSinkWriter
         }
     }
 
-    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) 
throws IOException {
+    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row, 
Shard shard)
+            throws IOException {
         String data =
                 this.readerOption.getFields().stream()
                                 .map(
@@ -227,12 +231,28 @@ public class ClickhouseFileSinkWriter
                                         })
                                 
.collect(Collectors.joining(readerOption.getFileFieldsDelimiter()))
                         + "\n";
+
         MappedByteBuffer buffer =
-                fileChannel.map(
-                        FileChannel.MapMode.READ_WRITE,
-                        fileChannel.size(),
-                        data.getBytes(StandardCharsets.UTF_8).length);
-        buffer.put(data.getBytes(StandardCharsets.UTF_8));
+                bufferCache.computeIfAbsent(
+                        shard,
+                        k -> {
+                            try {
+                                return fileChannel.map(
+                                        FileChannel.MapMode.READ_WRITE, 0, 
bufferSize);
+                            } catch (IOException e) {
+                                throw new ClickhouseConnectorException(
+                                        
CommonErrorCodeDeprecated.FILE_OPERATION_FAILED,
+                                        "data_local file write failed",
+                                        e);
+                            }
+                        });
+        byte[] byteData = data.getBytes(StandardCharsets.UTF_8);
+        if (buffer.position() + byteData.length > buffer.capacity()) {
+            buffer =
+                    fileChannel.map(FileChannel.MapMode.READ_WRITE, 
fileChannel.size(), bufferSize);
+            bufferCache.put(shard, buffer);
+        }
+        buffer.put(byteData);
     }
 
     private List<String> generateClickhouseLocalFiles(String 
clickhouseLocalFileTmpFile)

Reply via email to