This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e120dc44b [Bug] [Connector-V2] Clickhouse File Connector failed to 
sink to table with settings like storage_policy (#4172)
e120dc44b is described below

commit e120dc44bcefcd0d0905a3f32e054347fd4a1f15
Author: sanyu <[email protected]>
AuthorDate: Mon Mar 6 12:59:46 2023 +0800

    [Bug] [Connector-V2] Clickhouse File Connector failed to sink to table with 
settings like storage_policy (#4172)
---
 .../sink/file/ClickhouseFileSinkWriter.java        | 36 +++++++++++++++++++---
 1 file changed, 31 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index b23699fe4..9328137fd 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -62,6 +63,8 @@ public class ClickhouseFileSinkWriter
     private static final String CK_LOCAL_CONFIG_TEMPLATE =
             "<yandex><path> %s </path> <users><default><password/> 
<profile>default</profile> <quota>default</quota>"
                     + 
"<access_management>1</access_management></default></users><profiles><default/></profiles><quotas><default/></quotas></yandex>";
+    private static final String CLICKHOUSE_SETTINGS_KEY = "SETTINGS";
+    private static final String CLICKHOUSE_DDL_SETTING_FILTER = 
"storage_policy";
     private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = 
"/local_data.log";
     private static final int UUID_LENGTH = 10;
     private final FileReaderOption readerOption;
@@ -74,6 +77,7 @@ public class ClickhouseFileSinkWriter
     private final Map<Shard, String> shardTempFile;
 
     private final SinkWriter.Context context;
+    private final ThreadLocalRandom threadLocalRandom = 
ThreadLocalRandom.current();
 
     public ClickhouseFileSinkWriter(FileReaderOption readerOption, 
SinkWriter.Context context) {
         this.readerOption = readerOption;
@@ -263,10 +267,7 @@ public class ClickhouseFileSinkWriter
         command.add(
                 String.format(
                         "\"%s; INSERT INTO TABLE %s SELECT %s FROM 
temp_table%s;\"",
-                        clickhouseTable
-                                .getCreateTableDDL()
-                                .replace(clickhouseTable.getDatabase() + ".", 
"")
-                                .replaceAll("`", ""),
+                        adjustClickhouseDDL(),
                         clickhouseTable.getLocalTableName(),
                         readerOption.getTableSchema().keySet().stream()
                                 .map(
@@ -364,8 +365,9 @@ public class ClickhouseFileSinkWriter
                 FileTransferFactory.createFileTransfer(
                         this.readerOption.getCopyMethod(), hostAddress, user, 
password);
         fileTransfer.init();
+        int randomPath = 
threadLocalRandom.nextInt(shardLocalDataPaths.get(shard).size());
         fileTransfer.transferAndChown(
-                clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + 
"detached/");
+                clickhouseLocalFiles, 
shardLocalDataPaths.get(shard).get(randomPath) + "detached/");
         fileTransfer.close();
     }
 
@@ -386,4 +388,28 @@ public class ClickhouseFileSinkWriter
                     e);
         }
     }
+
+    private String adjustClickhouseDDL() {
+        String createTableDDL =
+                clickhouseTable
+                        .getCreateTableDDL()
+                        .replace(clickhouseTable.getDatabase() + ".", "")
+                        .replaceAll("`", "");
+        if (createTableDDL.contains(CLICKHOUSE_SETTINGS_KEY)) {
+            List<String> filters =
+                    Arrays.stream(CLICKHOUSE_DDL_SETTING_FILTER.split(","))
+                            .collect(Collectors.toList());
+            int p = createTableDDL.indexOf(CLICKHOUSE_SETTINGS_KEY);
+            String filteredSetting =
+                    Arrays.stream(
+                                    createTableDDL
+                                            .substring(p + 
CLICKHOUSE_SETTINGS_KEY.length())
+                                            .split(","))
+                            .filter(e -> 
!filters.contains(e.split("=")[0].trim()))
+                            .collect(Collectors.joining(","));
+            createTableDDL =
+                    createTableDDL.substring(0, p) + CLICKHOUSE_SETTINGS_KEY + 
filteredSetting;
+        }
+        return createTableDDL;
+    }
 }

Reply via email to