This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cf39e29dad [Improve] Speed up ClickhouseFile Local generate a mmap
object (#5822)
cf39e29dad is described below
commit cf39e29dad56e053052f51bd77faf8807f5ac281
Author: luo <[email protected]>
AuthorDate: Thu Nov 16 10:58:59 2023 +0800
[Improve] Speed up ClickhouseFile Local generate a mmap object (#5822)
---
.../sink/file/ClickhouseFileSinkWriter.java | 34 +++++++++++++++++-----
1 file changed, 27 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 8bfa9fcb34..ac7f5d1401 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -73,6 +73,8 @@ public class ClickhouseFileSinkWriter
private final ClickhouseTable clickhouseTable;
private final Map<Shard, List<String>> shardLocalDataPaths;
private final Map<Shard, FileChannel> rowCache;
+ private final Map<Shard, MappedByteBuffer> bufferCache;
+ private final Integer bufferSize = 1024 * 128;
private final Map<Shard, String> shardTempFile;
@@ -91,6 +93,7 @@ public class ClickhouseFileSinkWriter
this.readerOption.getShardMetadata().getDatabase(),
this.readerOption.getShardMetadata().getTable());
rowCache = new HashMap<>(Common.COLLECTION_SIZE);
+ bufferCache = new HashMap<>(Common.COLLECTION_SIZE);
shardTempFile = new HashMap<>();
nodePasswordCheck();
@@ -141,7 +144,7 @@ public class ClickhouseFileSinkWriter
e);
}
});
- saveDataToFile(channel, element);
+ saveDataToFile(channel, element, shard);
}
private void nodePasswordCheck() {
@@ -209,7 +212,8 @@ public class ClickhouseFileSinkWriter
}
}
- private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row)
throws IOException {
+ private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row,
Shard shard)
+ throws IOException {
String data =
this.readerOption.getFields().stream()
.map(
@@ -227,12 +231,28 @@ public class ClickhouseFileSinkWriter
})
.collect(Collectors.joining(readerOption.getFileFieldsDelimiter()))
+ "\n";
+
MappedByteBuffer buffer =
- fileChannel.map(
- FileChannel.MapMode.READ_WRITE,
- fileChannel.size(),
- data.getBytes(StandardCharsets.UTF_8).length);
- buffer.put(data.getBytes(StandardCharsets.UTF_8));
+ bufferCache.computeIfAbsent(
+ shard,
+ k -> {
+ try {
+ return fileChannel.map(
+ FileChannel.MapMode.READ_WRITE, 0,
bufferSize);
+ } catch (IOException e) {
+ throw new ClickhouseConnectorException(
+
CommonErrorCodeDeprecated.FILE_OPERATION_FAILED,
+ "data_local file write failed",
+ e);
+ }
+ });
+ byte[] byteData = data.getBytes(StandardCharsets.UTF_8);
+ if (buffer.position() + byteData.length > buffer.capacity()) {
+ buffer =
+ fileChannel.map(FileChannel.MapMode.READ_WRITE,
fileChannel.size(), bufferSize);
+ bufferCache.put(shard, buffer);
+ }
+ buffer.put(byteData);
}
private List<String> generateClickhouseLocalFiles(String
clickhouseLocalFileTmpFile)