This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new aab0a32  [improve] Supports http request use utf8 charset (#347)
aab0a32 is described below

commit aab0a324a8f5b9484849dd578d677da260ef1a2a
Author: gnehil <[email protected]>
AuthorDate: Fri Nov 21 10:17:24 2025 +0800

    [improve] Supports http request use utf8 charset (#347)
---
 .../apache/doris/spark/config/DorisOptions.java    |  2 +
 .../org/apache/doris/spark/util/HttpUtils.scala    | 12 +++++-
 .../apache/doris/spark/sql/DorisWriterITCase.scala | 48 ++++++++++++++++++++++
 .../resources/container/ddl/write_unicode_col.sql  | 11 +++++
 4 files changed, 72 insertions(+), 1 deletion(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 5600411..940f06b 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -144,5 +144,7 @@ public class DorisOptions {
 
     public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE = 
ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 * 
1024).withDescription("");
 
+    public static final ConfigOption<Boolean> DORIS_SINK_HTTP_UTF8_CHARSET = 
ConfigOptions.name("doris.sink.http-utf8-charset").booleanType().defaultValue(false).withDescription("");
+
 
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
index 90031f2..cdc0560 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
@@ -20,13 +20,14 @@ package org.apache.doris.spark.util
 import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
 import org.apache.http.HttpHeaders
 import org.apache.http.client.methods.HttpRequestBase
+import org.apache.http.config.ConnectionConfig
 import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
 import org.apache.http.impl.client.{CloseableHttpClient, 
DefaultRedirectStrategy, HttpClients}
 import org.apache.http.protocol.HttpRequestExecutor
 import org.apache.http.ssl.SSLContexts
 
 import java.io.{File, FileInputStream}
-import java.nio.charset.StandardCharsets
+import java.nio.charset.{CodingErrorAction, StandardCharsets}
 import java.security.KeyStore
 import java.util.Base64
 import scala.util.{Failure, Success, Try}
@@ -34,7 +35,16 @@ import scala.util.{Failure, Success, Try}
 object HttpUtils {
 
   def getHttpClient(config: DorisConfig): CloseableHttpClient = {
+
+    var connectionConfig = ConnectionConfig.DEFAULT;
+    if (config.getValue(DorisOptions.DORIS_SINK_HTTP_UTF8_CHARSET)) {
+      connectionConfig = ConnectionConfig.custom()
+        .setCharset(StandardCharsets.UTF_8)
+        .setMalformedInputAction(CodingErrorAction.REPLACE)
+        .setUnmappableInputAction(CodingErrorAction.REPLACE).build()
+    }
     val builder = HttpClients.custom()
+      .setDefaultConnectionConfig(connectionConfig)
       .setRequestExecutor(new HttpRequestExecutor(60000))
       .setRedirectStrategy(new DefaultRedirectStrategy {
         override def isRedirectable(method: String): Boolean = true
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
index 310d4e8..e591c47 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -45,6 +45,7 @@ class DorisWriterITCase extends AbstractContainerTestBase {
   val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite"
   val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow"
   val TABLE_BITMAP_TBL: String = "tbl_write_tbl_bitmap"
+  val TABLE_UNICODE_COL: String = "tbl_unicode_col"
 
   @Test
   @throws[Exception]
@@ -453,4 +454,51 @@ class DorisWriterITCase extends AbstractContainerTestBase {
     LOG.info("Checking DorisWriterITCase result. testName={}, actual={}, 
expected={}", testName, actual, expected)
     assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
   }
+
+  @Test
+  def testWriteUnicodeColumn(): Unit = {
+    val createDb = String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)
+    val targetInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/write_unicode_col.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection, LOG, 
Array(createDb): _*)
+    val connection = getDorisQueryConnection(DATABASE)
+    ContainerUtils.executeSQLStatement(connection, LOG, targetInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val df = session.createDataFrame(Seq(
+        (1, "243"),
+        (2, "1"),
+        (3, "287667876573")
+      )).toDF("序号", "内容")
+      df.createTempView("mock_source")
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_sink
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_UNICODE_COL}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.sink.http-utf8-charset"="true"
+           |)
+           |""".stripMargin)
+      session.sql(
+        """
+          |insert into test_sink select `序号`,`内容` from mock_source
+          |""".stripMargin)
+
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        connection,
+        LOG,
+        String.format("select `序号`,`内容` from %s.%s", DATABASE, 
TABLE_UNICODE_COL),
+        2)
+      val expected = util.Arrays.asList("1,243", "2,1", "3,287667876573");
+      checkResultInAnyOrder("testWriteUnicodeColumn", expected.toArray, 
actual.toArray)
+    } finally {
+      session.stop()
+    }
+  }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_unicode_col.sql
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_unicode_col.sql
new file mode 100644
index 0000000..f4db027
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_unicode_col.sql
@@ -0,0 +1,11 @@
+SET enable_unicode_name_support = true;
+DROP TABLE IF EXISTS `tbl_unicode_col`;
+CREATE TABLE `tbl_unicode_col` (
+  `序号` int NULL,
+  `内容` text NULL
+) ENGINE=OLAP
+DUPLICATE KEY(`序号`)
+DISTRIBUTED BY HASH(`序号`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to