Z1Wu commented on code in PR #3416:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3416#discussion_r1042311747


##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java:
##########
@@ -105,56 +124,55 @@ private void nodePasswordCheck() {
     }
 
     @Override
-    public Optional<CKCommitInfo> prepareCommit() throws IOException {
-        return Optional.empty();
+    public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
+        Map<Shard, List<String>> detachedFiles = new HashMap<>();
+        shardTempFile.forEach((shard, path) -> {
+            try {
+                List<String> clickhouseLocalFiles = 
generateClickhouseLocalFiles(path);
+                // move file to server
+                moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+                detachedFiles.put(shard, clickhouseLocalFiles);
+                // clear local file
+                clearLocalFileDirectory(clickhouseLocalFiles);
+            } catch (Exception e) {
+                throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data 
into clickhouse file error", e);
+            }
+        });
+        rowCache.clear();
+        shardTempFile.clear();
+        return Optional.of(new CKFileCommitInfo(detachedFiles));
     }
 
     @Override
     public void abortPrepare() {
-
     }
 
     @Override
     public void close() throws IOException {
-        rowCache.forEach(this::flush);
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
     }
 
-    private void flush(Shard shard, List<SeaTunnelRow> rows) {
-        try {
-            // generate clickhouse local file
-            // TODO generate file by sub rows to save memory
-            List<String> clickhouseLocalFiles = 
generateClickhouseLocalFiles(rows);
-            // move file to server
-            attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
-            // clear local file
-            clearLocalFileDirectory(clickhouseLocalFiles);
-        } catch (Exception e) {
-            throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data 
into clickhouse file error", e);
-        }
+    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) 
throws IOException {
+        String data = this.readerOption.getFields().stream().map(field -> 
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())

Review Comment:
   Simplely using `toString` to serialize `SeatunnelRow` Object into file, 
which evently calls the underlying object's `toString` method, will generate 
some data field string that  can't be recognized by `clickhouse local` program. 
   e.g. Datetime object's toString method or `String`  which contains `\t` 
char.( will mess up the resulting tsv file).
   



##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java:
##########
@@ -58,24 +55,33 @@
 import java.util.stream.Collectors;
 
 @Slf4j
-public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
CKCommitInfo, ClickhouseSinkState> {
-    private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = 
"/tmp/clickhouse-local/seatunnel-file";
+public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
CKFileCommitInfo, ClickhouseSinkState> {
+    private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = 
"/tmp/seatunnel/clickhouse-local/seatunnel-file";

Review Comment:
   Maybe we can set  `CLICKHOUSE_LOCAL_FILE_PREFIX` as a filed that can be set 
by configuration file. In some case, `/tmp` may be no big enough to save all 
temporary files.



##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java:
##########
@@ -105,56 +124,55 @@ private void nodePasswordCheck() {
     }
 
     @Override
-    public Optional<CKCommitInfo> prepareCommit() throws IOException {
-        return Optional.empty();
+    public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
+        Map<Shard, List<String>> detachedFiles = new HashMap<>();
+        shardTempFile.forEach((shard, path) -> {
+            try {
+                List<String> clickhouseLocalFiles = 
generateClickhouseLocalFiles(path);
+                // move file to server
+                moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+                detachedFiles.put(shard, clickhouseLocalFiles);
+                // clear local file
+                clearLocalFileDirectory(clickhouseLocalFiles);
+            } catch (Exception e) {
+                throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data 
into clickhouse file error", e);
+            }
+        });
+        rowCache.clear();
+        shardTempFile.clear();
+        return Optional.of(new CKFileCommitInfo(detachedFiles));
     }
 
     @Override
     public void abortPrepare() {
-
     }
 
     @Override
     public void close() throws IOException {
-        rowCache.forEach(this::flush);
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
     }
 
-    private void flush(Shard shard, List<SeaTunnelRow> rows) {
-        try {
-            // generate clickhouse local file
-            // TODO generate file by sub rows to save memory
-            List<String> clickhouseLocalFiles = 
generateClickhouseLocalFiles(rows);
-            // move file to server
-            attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
-            // clear local file
-            clearLocalFileDirectory(clickhouseLocalFiles);
-        } catch (Exception e) {
-            throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data 
into clickhouse file error", e);
-        }
+    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) 
throws IOException {
+        String data = this.readerOption.getFields().stream().map(field -> 
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+            .collect(Collectors.joining("\t")) + "\n";
+        MappedByteBuffer buffer = 
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+            data.getBytes(StandardCharsets.UTF_8).length);

Review Comment:
   Putting every row into `buffer` inside `mmap` region of file channel may 
cause the executor consumes too much RAM then crushes because of OOM.  In my 
opinion, Clickhouse File Connector is designed for huge data loading scenario 
where there will be multiples GB of data in every executor.And it is hard to 
put all these data in memory. 
   Maybe we can make a tradeoff between speed and memory capacity.  We can dump 
some rows into disk when it accumulate to a certain threshold. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to