This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 34408fb4 [improve] Supports http request use utf8 charset (#623)
34408fb4 is described below

commit 34408fb4f958ec4c15792e54316ec57b91b6f032
Author: gnehil <[email protected]>
AuthorDate: Tue Dec 16 18:41:56 2025 +0800

    [improve] Supports http request use utf8 charset (#623)
---
 .../doris/flink/cfg/ConfigurationOptions.java      |  3 +
 .../doris/flink/cfg/DorisExecutionOptions.java     | 22 ++++++
 .../java/org/apache/doris/flink/sink/HttpUtil.java | 30 +++++++-
 .../flink/sink/batch/DorisBatchStreamLoad.java     |  4 +-
 .../doris/flink/sink/committer/DorisCommitter.java |  3 +-
 .../doris/flink/sink/writer/DorisWriter.java       |  3 +-
 .../doris/flink/table/DorisConfigOptions.java      |  7 ++
 .../flink/table/DorisDynamicTableFactory.java      |  3 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  3 +
 .../doris/flink/cfg/DorisExecutionOptionsTest.java |  8 ++-
 .../apache/doris/flink/sink/DorisSinkITCase.java   | 80 ++++++++++++++++++++++
 11 files changed, 159 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index bfca9b05..54b18400 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -60,4 +60,7 @@ public interface ConfigurationOptions {
 
     String FLIGHT_SQL_PORT = "source.flight-sql-port";
     Integer FLIGHT_SQL_PORT_DEFAULT = -1;
+
+    String SINK_HTTP_UTF8_CHARSET = "sink.http-utf8-charset";
+    Boolean SINK_HTTP_UTF8_CHARSET_DEFAULT = false;
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 831a317e..99379ff8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -49,6 +49,7 @@ public class DorisExecutionOptions implements Serializable {
     private final int bufferCount;
     private final String labelPrefix;
     private final boolean useCache;
+    private final boolean httpUtf8Charset;
 
     /** Properties for the StreamLoad. */
     private final Properties streamLoadProp;
@@ -74,6 +75,7 @@ public class DorisExecutionOptions implements Serializable {
             int bufferCount,
             String labelPrefix,
             boolean useCache,
+            boolean httpUtf8Charset,
             Properties streamLoadProp,
             Boolean enableDelete,
             Boolean enable2PC,
@@ -93,6 +95,7 @@ public class DorisExecutionOptions implements Serializable {
         this.bufferCount = bufferCount;
         this.labelPrefix = labelPrefix;
         this.useCache = useCache;
+        this.httpUtf8Charset = httpUtf8Charset;
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
         this.enable2PC = enable2PC;
@@ -162,6 +165,10 @@ public class DorisExecutionOptions implements Serializable 
{
         return useCache;
     }
 
+    public boolean isHttpUtf8Charset() {
+        return httpUtf8Charset;
+    }
+
     public Properties getStreamLoadProp() {
         return streamLoadProp;
     }
@@ -228,6 +235,7 @@ public class DorisExecutionOptions implements Serializable {
                 && bufferSize == that.bufferSize
                 && bufferCount == that.bufferCount
                 && useCache == that.useCache
+                && httpUtf8Charset == that.httpUtf8Charset
                 && force2PC == that.force2PC
                 && flushQueueSize == that.flushQueueSize
                 && bufferFlushMaxRows == that.bufferFlushMaxRows
@@ -252,6 +260,7 @@ public class DorisExecutionOptions implements Serializable {
                 bufferCount,
                 labelPrefix,
                 useCache,
+                httpUtf8Charset,
                 streamLoadProp,
                 enableDelete,
                 enable2PC,
@@ -274,6 +283,7 @@ public class DorisExecutionOptions implements Serializable {
         private int bufferCount = DEFAULT_BUFFER_COUNT;
         private String labelPrefix = "";
         private boolean useCache = false;
+        private boolean httpUtf8Charset = false;
         private Properties streamLoadProp = new Properties();
         private boolean enableDelete = true;
         private boolean enable2PC = true;
@@ -361,6 +371,17 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        /**
+         * Sets whether to set http utf8 charset for stream load.
+         *
+         * @param httpUtf8Charset
+         * @return this DorisExecutionOptions.builder.
+         */
+        public Builder setHttpUtf8Charset(boolean httpUtf8Charset) {
+            this.httpUtf8Charset = httpUtf8Charset;
+            return this;
+        }
+
         /**
          * Sets the properties for stream load.
          *
@@ -529,6 +550,7 @@ public class DorisExecutionOptions implements Serializable {
                     bufferCount,
                     labelPrefix,
                     useCache,
+                    httpUtf8Charset,
                     streamLoadProp,
                     enableDelete,
                     enable2PC,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 38716826..015a7ecb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.sink;
 
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.ConnectionConfig;
 import org.apache.http.impl.NoConnectionReuseStrategy;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultRedirectStrategy;
@@ -27,29 +28,46 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.http.protocol.HttpRequestExecutor;
 import org.apache.http.protocol.RequestContent;
 
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.SINK_HTTP_UTF8_CHARSET_DEFAULT;
 
 /** util to build http client. */
 public class HttpUtil {
     private final int connectTimeout;
     private final int waitForContinueTimeout;
+    private final boolean httpUtf8Charset;
     private HttpClientBuilder httpClientBuilder;
 
     public HttpUtil() {
         this.connectTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
         this.waitForContinueTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+        this.httpUtf8Charset = SINK_HTTP_UTF8_CHARSET_DEFAULT;
         settingStreamHttpClientBuilder();
     }
 
-    public HttpUtil(DorisReadOptions readOptions) {
+    public HttpUtil(DorisReadOptions readOptions, boolean httpUtf8Charset) {
         this.connectTimeout = readOptions.getRequestConnectTimeoutMs();
         this.waitForContinueTimeout = readOptions.getRequestConnectTimeoutMs();
+        this.httpUtf8Charset = httpUtf8Charset;
         settingStreamHttpClientBuilder();
     }
 
     private void settingStreamHttpClientBuilder() {
+        ConnectionConfig connectionConfig = ConnectionConfig.DEFAULT;
+        if (httpUtf8Charset) {
+            connectionConfig =
+                    ConnectionConfig.custom()
+                            .setCharset(StandardCharsets.UTF_8)
+                            .setMalformedInputAction(CodingErrorAction.REPLACE)
+                            
.setUnmappableInputAction(CodingErrorAction.REPLACE)
+                            .build();
+        }
         this.httpClientBuilder =
                 HttpClients.custom()
+                        .setDefaultConnectionConfig(connectionConfig)
                         // default timeout 3s, maybe report 307 error when fe 
busy
                         .setRequestExecutor(new 
HttpRequestExecutor(waitForContinueTimeout))
                         .setRedirectStrategy(
@@ -84,7 +102,17 @@ public class HttpUtil {
      * @return
      */
     public HttpClientBuilder getHttpClientBuilderForBatch() {
+        ConnectionConfig connectionConfig = ConnectionConfig.DEFAULT;
+        if (httpUtf8Charset) {
+            connectionConfig =
+                    ConnectionConfig.custom()
+                            .setCharset(StandardCharsets.UTF_8)
+                            .setMalformedInputAction(CodingErrorAction.REPLACE)
+                            
.setUnmappableInputAction(CodingErrorAction.REPLACE)
+                            .build();
+        }
         return HttpClients.custom()
+                .setDefaultConnectionConfig(connectionConfig)
                 .setRedirectStrategy(
                         new DefaultRedirectStrategy() {
                             @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index a7d7c8c6..ee1d9be6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -172,7 +172,9 @@ public class DorisBatchStreamLoad implements Serializable {
         this.started = new AtomicBoolean(true);
         this.loadExecutorService.execute(loadAsyncExecutor);
         this.subTaskId = subTaskId;
-        this.httpClientBuilder = new 
HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
+        this.httpClientBuilder =
+                new HttpUtil(dorisReadOptions, 
executionOptions.isHttpUtf8Charset())
+                        .getHttpClientBuilderForBatch();
     }
 
     /**
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index b1a70059..2593e303 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -70,7 +70,8 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
                 dorisOptions,
                 dorisReadOptions,
                 executionOptions,
-                new HttpUtil(dorisReadOptions).getHttpClient());
+                new HttpUtil(dorisReadOptions, 
executionOptions.isHttpUtf8Charset())
+                        .getHttpClient());
     }
 
     public DorisCommitter(
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 64fbc95f..da07249f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -310,7 +310,8 @@ public class DorisWriter<IN>
                                 dorisOptions,
                                 executionOptions,
                                 labelGenerator,
-                                new 
HttpUtil(dorisReadOptions).getHttpClient()));
+                                new HttpUtil(dorisReadOptions, 
executionOptions.isHttpUtf8Charset())
+                                        .getHttpClient()));
     }
 
     /** Http throws an exception actively, there is no need to check 
regularly. */
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 56f96153..f1d44943 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -340,4 +340,11 @@ public class DorisConfigOptions {
         }
         return streamLoadProp;
     }
+
+    public static final ConfigOption<Boolean> SINK_HTTP_UTF8_CHARSET =
+            ConfigOptions.key("sink.http-utf8-charset")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Set sink http client default charset to utf8 for 
support unicode characters in header, the default value is false");
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 6495eadc..fd3d2ad3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -74,6 +74,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_HTTP_UTF8_CHARSET;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
@@ -165,6 +166,7 @@ public final class DorisDynamicTableFactory
 
         options.add(USE_FLIGHT_SQL);
         options.add(FLIGHT_SQL_PORT);
+        options.add(SINK_HTTP_UTF8_CHARSET);
         return options;
     }
 
@@ -263,6 +265,7 @@ public final class DorisDynamicTableFactory
                 (int) 
readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
         
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
         builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
+        builder.setHttpUtf8Charset(readableConfig.get(SINK_HTTP_UTF8_CHARSET));
         return builder.build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 326b4e87..13039785 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -326,6 +326,9 @@ public abstract class DatabaseSync {
         sinkConfig
                 .getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
                 .ifPresent(executionBuilder::setIgnoreCommitError);
+        sinkConfig
+                .getOptional(DorisConfigOptions.SINK_HTTP_UTF8_CHARSET)
+                .ifPresent(executionBuilder::setHttpUtf8Charset);
 
         DorisExecutionOptions executionOptions = executionBuilder.build();
         builder.setDorisReadOptions(DorisReadOptions.builder().build())
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
index bc19c572..97c2868a 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
@@ -62,6 +62,7 @@ public class DorisExecutionOptionsTest {
                         .setUseCache(true)
                         .setFlushQueueSize(2)
                         .setIgnoreUpdateBefore(true)
+                        .setHttpUtf8Charset(true)
                         .build();
 
         DorisExecutionOptions.Builder builder =
@@ -81,7 +82,8 @@ public class DorisExecutionOptionsTest {
                         .setBatchMode(false)
                         .setUseCache(true)
                         .setFlushQueueSize(2)
-                        .setIgnoreUpdateBefore(true);
+                        .setIgnoreUpdateBefore(true)
+                        .setHttpUtf8Charset(true);
 
         Assert.assertNotEquals(exceptOptions, null);
         Assert.assertEquals(exceptOptions, exceptOptions);
@@ -147,9 +149,9 @@ public class DorisExecutionOptionsTest {
         Assert.assertNotEquals(exceptOptions, builder.build());
         builder.setFlushQueueSize(2);
 
-        builder.setIgnoreUpdateBefore(false);
+        builder.setHttpUtf8Charset(false);
         Assert.assertNotEquals(exceptOptions, builder.build());
-        builder.setIgnoreUpdateBefore(true);
+        builder.setHttpUtf8Charset(true);
     }
 
     @Test(expected = IllegalArgumentException.class)
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 0ebc48fc..76256543 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -78,6 +78,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     static final String TABLE_GZ_FORMAT = "tbl_gz_format";
     static final String TABLE_CSV_JM = "tbl_csv_jm";
     static final String TABLE_CSV_TM = "tbl_csv_tm";
+    static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column";
 
     private final boolean batchMode;
 
@@ -732,4 +733,83 @@ public class DorisSinkITCase extends AbstractITCaseService 
{
                         max,
                         model));
     }
+
+    private void initializeUnicodeColumnTable(String table, DataModel 
dataModel) {
+        String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "";
+        String morProps =
+                !DataModel.UNIQUE_MOR.equals(dataModel)
+                        ? ""
+                        : ",\"enable_unique_key_merge_on_write\" = \"false\"";
+        String model =
+                dataModel.equals(DataModel.UNIQUE_MOR)
+                        ? DataModel.UNIQUE.toString()
+                        : dataModel.toString();
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                "SET enable_unicode_name_support = true;",
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`名称` varchar(256),\n"
+                                + "`年龄` int %s\n"
+                                + ") "
+                                + " %s KEY(`名称`) "
+                                + " DISTRIBUTED BY HASH(`名称`) BUCKETS 1\n"
+                                + "PROPERTIES ("
+                                + "\"replication_num\" = \"1\"\n"
+                                + morProps
+                                + ")",
+                        DATABASE,
+                        table,
+                        max,
+                        model));
+    }
+
+    @Test
+    public void testSinkUnicodeColumn() throws Exception {
+        initializeUnicodeColumnTable(TABLE_UNICODE_COLUMN, DataModel.UNIQUE);
+        Properties properties = new Properties();
+        properties.setProperty("read_json_by_line", "true");
+        properties.setProperty("format", "json");
+
+        // mock data
+        Map<String, Object> row1 = new HashMap<>();
+        row1.put("名称", "doris1");
+        row1.put("年龄", 1);
+        Map<String, Object> row2 = new HashMap<>();
+        row2.put("名称", "doris2");
+        row2.put("年龄", 2);
+
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .setBatchMode(batchMode)
+                .setStreamLoadProp(properties)
+                // uniq need to be false
+                .setDeletable(false)
+                .setHttpUtf8Charset(true);
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder
+                .setFenodes(getFenodes())
+                .setTableIdentifier(DATABASE + "." + TABLE_UNICODE_COLUMN)
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
+
+        submitJob(
+                dorisBuilder.build(),
+                executionBuilder.build(),
+                new String[] {
+                    new ObjectMapper().writeValueAsString(row1),
+                    new ObjectMapper().writeValueAsString(row2)
+                });
+
+        Thread.sleep(10000);
+        List<String> expected = Arrays.asList("doris1,1", "doris2,2");
+        String query =
+                String.format(
+                        "select `名称`,`年龄` from %s.%s order by 1", DATABASE, 
TABLE_UNICODE_COLUMN);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to