This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new aab0a32 [improve] Supports http request use utf8 charset (#347)
aab0a32 is described below
commit aab0a324a8f5b9484849dd578d677da260ef1a2a
Author: gnehil <[email protected]>
AuthorDate: Fri Nov 21 10:17:24 2025 +0800
[improve] Supports http request use utf8 charset (#347)
---
.../apache/doris/spark/config/DorisOptions.java | 2 +
.../org/apache/doris/spark/util/HttpUtils.scala | 12 +++++-
.../apache/doris/spark/sql/DorisWriterITCase.scala | 48 ++++++++++++++++++++++
.../resources/container/ddl/write_unicode_col.sql | 11 +++++
4 files changed, 72 insertions(+), 1 deletion(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 5600411..940f06b 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -144,5 +144,7 @@ public class DorisOptions {
public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE =
ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 *
1024).withDescription("");
+ public static final ConfigOption<Boolean> DORIS_SINK_HTTP_UTF8_CHARSET =
ConfigOptions.name("doris.sink.http-utf8-charset").booleanType().defaultValue(false).withDescription("");
+
}
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
index 90031f2..cdc0560 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
@@ -20,13 +20,14 @@ package org.apache.doris.spark.util
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpRequestBase
+import org.apache.http.config.ConnectionConfig
import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
import org.apache.http.impl.client.{CloseableHttpClient,
DefaultRedirectStrategy, HttpClients}
import org.apache.http.protocol.HttpRequestExecutor
import org.apache.http.ssl.SSLContexts
import java.io.{File, FileInputStream}
-import java.nio.charset.StandardCharsets
+import java.nio.charset.{CodingErrorAction, StandardCharsets}
import java.security.KeyStore
import java.util.Base64
import scala.util.{Failure, Success, Try}
@@ -34,7 +35,16 @@ import scala.util.{Failure, Success, Try}
object HttpUtils {
def getHttpClient(config: DorisConfig): CloseableHttpClient = {
+
+ var connectionConfig = ConnectionConfig.DEFAULT;
+ if (config.getValue(DorisOptions.DORIS_SINK_HTTP_UTF8_CHARSET)) {
+ connectionConfig = ConnectionConfig.custom()
+ .setCharset(StandardCharsets.UTF_8)
+ .setMalformedInputAction(CodingErrorAction.REPLACE)
+ .setUnmappableInputAction(CodingErrorAction.REPLACE).build()
+ }
val builder = HttpClients.custom()
+ .setDefaultConnectionConfig(connectionConfig)
.setRequestExecutor(new HttpRequestExecutor(60000))
.setRedirectStrategy(new DefaultRedirectStrategy {
override def isRedirectable(method: String): Boolean = true
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
index 310d4e8..e591c47 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -45,6 +45,7 @@ class DorisWriterITCase extends AbstractContainerTestBase {
val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite"
val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow"
val TABLE_BITMAP_TBL: String = "tbl_write_tbl_bitmap"
+ val TABLE_UNICODE_COL: String = "tbl_unicode_col"
@Test
@throws[Exception]
@@ -453,4 +454,51 @@ class DorisWriterITCase extends AbstractContainerTestBase {
LOG.info("Checking DorisWriterITCase result. testName={}, actual={},
expected={}", testName, actual, expected)
assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
}
+
+ @Test
+ def testWriteUnicodeColumn(): Unit = {
+ val createDb = String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)
+ val targetInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/write_unicode_col.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection, LOG,
Array(createDb): _*)
+ val connection = getDorisQueryConnection(DATABASE)
+ ContainerUtils.executeSQLStatement(connection, LOG, targetInitSql: _*)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val df = session.createDataFrame(Seq(
+ (1, "243"),
+ (2, "1"),
+ (3, "287667876573")
+ )).toDF("序号", "内容")
+ df.createTempView("mock_source")
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_sink
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_UNICODE_COL}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.sink.http-utf8-charset"="true"
+ |)
+ |""".stripMargin)
+ session.sql(
+ """
+ |insert into test_sink select `序号`,`内容` from mock_source
+ |""".stripMargin)
+
+ Thread.sleep(10000)
+ val actual = ContainerUtils.executeSQLStatement(
+ connection,
+ LOG,
+ String.format("select `序号`,`内容` from %s.%s", DATABASE,
TABLE_UNICODE_COL),
+ 2)
+ val expected = util.Arrays.asList("1,243", "2,1", "3,287667876573");
+ checkResultInAnyOrder("testWriteUnicodeColumn", expected.toArray,
actual.toArray)
+ } finally {
+ session.stop()
+ }
+ }
+
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_unicode_col.sql
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_unicode_col.sql
new file mode 100644
index 0000000..f4db027
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_unicode_col.sql
@@ -0,0 +1,11 @@
+SET enable_unicode_name_support = true;
+DROP TABLE IF EXISTS `tbl_unicode_col`;
+CREATE TABLE `tbl_unicode_col` (
+ `序号` int NULL,
+ `内容` text NULL
+) ENGINE=OLAP
+DUPLICATE KEY(`序号`)
+DISTRIBUTED BY HASH(`序号`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]