This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e120dc44b [Bug] [Connector-V2] Clickhouse File Connector failed to
sink to table with settings like storage_policy (#4172)
e120dc44b is described below
commit e120dc44bcefcd0d0905a3f32e054347fd4a1f15
Author: sanyu <[email protected]>
AuthorDate: Mon Mar 6 12:59:46 2023 +0800
[Bug] [Connector-V2] Clickhouse File Connector failed to sink to table with
settings like storage_policy (#4172)
---
.../sink/file/ClickhouseFileSinkWriter.java | 36 +++++++++++++++++++---
1 file changed, 31 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index b23699fe4..9328137fd 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -62,6 +63,8 @@ public class ClickhouseFileSinkWriter
private static final String CK_LOCAL_CONFIG_TEMPLATE =
"<yandex><path> %s </path> <users><default><password/>
<profile>default</profile> <quota>default</quota>"
+
"<access_management>1</access_management></default></users><profiles><default/></profiles><quotas><default/></quotas></yandex>";
+ private static final String CLICKHOUSE_SETTINGS_KEY = "SETTINGS";
+ private static final String CLICKHOUSE_DDL_SETTING_FILTER =
"storage_policy";
private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX =
"/local_data.log";
private static final int UUID_LENGTH = 10;
private final FileReaderOption readerOption;
@@ -74,6 +77,7 @@ public class ClickhouseFileSinkWriter
private final Map<Shard, String> shardTempFile;
private final SinkWriter.Context context;
+ private final ThreadLocalRandom threadLocalRandom =
ThreadLocalRandom.current();
public ClickhouseFileSinkWriter(FileReaderOption readerOption,
SinkWriter.Context context) {
this.readerOption = readerOption;
@@ -263,10 +267,7 @@ public class ClickhouseFileSinkWriter
command.add(
String.format(
"\"%s; INSERT INTO TABLE %s SELECT %s FROM
temp_table%s;\"",
- clickhouseTable
- .getCreateTableDDL()
- .replace(clickhouseTable.getDatabase() + ".",
"")
- .replaceAll("`", ""),
+ adjustClickhouseDDL(),
clickhouseTable.getLocalTableName(),
readerOption.getTableSchema().keySet().stream()
.map(
@@ -364,8 +365,9 @@ public class ClickhouseFileSinkWriter
FileTransferFactory.createFileTransfer(
this.readerOption.getCopyMethod(), hostAddress, user,
password);
fileTransfer.init();
+ int randomPath =
threadLocalRandom.nextInt(shardLocalDataPaths.get(shard).size());
fileTransfer.transferAndChown(
- clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) +
"detached/");
+ clickhouseLocalFiles,
shardLocalDataPaths.get(shard).get(randomPath) + "detached/");
fileTransfer.close();
}
@@ -386,4 +388,28 @@ public class ClickhouseFileSinkWriter
e);
}
}
+
+ private String adjustClickhouseDDL() {
+ String createTableDDL =
+ clickhouseTable
+ .getCreateTableDDL()
+ .replace(clickhouseTable.getDatabase() + ".", "")
+ .replaceAll("`", "");
+ if (createTableDDL.contains(CLICKHOUSE_SETTINGS_KEY)) {
+ List<String> filters =
+ Arrays.stream(CLICKHOUSE_DDL_SETTING_FILTER.split(","))
+ .collect(Collectors.toList());
+ int p = createTableDDL.indexOf(CLICKHOUSE_SETTINGS_KEY);
+ String filteredSetting =
+ Arrays.stream(
+ createTableDDL
+ .substring(p +
CLICKHOUSE_SETTINGS_KEY.length())
+ .split(","))
+ .filter(e ->
!filters.contains(e.split("=")[0].trim()))
+ .collect(Collectors.joining(","));
+ createTableDDL =
+ createTableDDL.substring(0, p) + CLICKHOUSE_SETTINGS_KEY +
filteredSetting;
+ }
+ return createTableDDL;
+ }
}