This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 34408fb4 [improve] Supports http request use utf8 charset (#623)
34408fb4 is described below
commit 34408fb4f958ec4c15792e54316ec57b91b6f032
Author: gnehil <[email protected]>
AuthorDate: Tue Dec 16 18:41:56 2025 +0800
[improve] Supports http request use utf8 charset (#623)
---
.../doris/flink/cfg/ConfigurationOptions.java | 3 +
.../doris/flink/cfg/DorisExecutionOptions.java | 22 ++++++
.../java/org/apache/doris/flink/sink/HttpUtil.java | 30 +++++++-
.../flink/sink/batch/DorisBatchStreamLoad.java | 4 +-
.../doris/flink/sink/committer/DorisCommitter.java | 3 +-
.../doris/flink/sink/writer/DorisWriter.java | 3 +-
.../doris/flink/table/DorisConfigOptions.java | 7 ++
.../flink/table/DorisDynamicTableFactory.java | 3 +
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 3 +
.../doris/flink/cfg/DorisExecutionOptionsTest.java | 8 ++-
.../apache/doris/flink/sink/DorisSinkITCase.java | 80 ++++++++++++++++++++++
11 files changed, 159 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index bfca9b05..54b18400 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -60,4 +60,7 @@ public interface ConfigurationOptions {
String FLIGHT_SQL_PORT = "source.flight-sql-port";
Integer FLIGHT_SQL_PORT_DEFAULT = -1;
+
+ String SINK_HTTP_UTF8_CHARSET = "sink.http-utf8-charset";
+ Boolean SINK_HTTP_UTF8_CHARSET_DEFAULT = false;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 831a317e..99379ff8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -49,6 +49,7 @@ public class DorisExecutionOptions implements Serializable {
private final int bufferCount;
private final String labelPrefix;
private final boolean useCache;
+ private final boolean httpUtf8Charset;
/** Properties for the StreamLoad. */
private final Properties streamLoadProp;
@@ -74,6 +75,7 @@ public class DorisExecutionOptions implements Serializable {
int bufferCount,
String labelPrefix,
boolean useCache,
+ boolean httpUtf8Charset,
Properties streamLoadProp,
Boolean enableDelete,
Boolean enable2PC,
@@ -93,6 +95,7 @@ public class DorisExecutionOptions implements Serializable {
this.bufferCount = bufferCount;
this.labelPrefix = labelPrefix;
this.useCache = useCache;
+ this.httpUtf8Charset = httpUtf8Charset;
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
this.enable2PC = enable2PC;
@@ -162,6 +165,10 @@ public class DorisExecutionOptions implements Serializable
{
return useCache;
}
+ public boolean isHttpUtf8Charset() {
+ return httpUtf8Charset;
+ }
+
public Properties getStreamLoadProp() {
return streamLoadProp;
}
@@ -228,6 +235,7 @@ public class DorisExecutionOptions implements Serializable {
&& bufferSize == that.bufferSize
&& bufferCount == that.bufferCount
&& useCache == that.useCache
+ && httpUtf8Charset == that.httpUtf8Charset
&& force2PC == that.force2PC
&& flushQueueSize == that.flushQueueSize
&& bufferFlushMaxRows == that.bufferFlushMaxRows
@@ -252,6 +260,7 @@ public class DorisExecutionOptions implements Serializable {
bufferCount,
labelPrefix,
useCache,
+ httpUtf8Charset,
streamLoadProp,
enableDelete,
enable2PC,
@@ -274,6 +283,7 @@ public class DorisExecutionOptions implements Serializable {
private int bufferCount = DEFAULT_BUFFER_COUNT;
private String labelPrefix = "";
private boolean useCache = false;
+ private boolean httpUtf8Charset = false;
private Properties streamLoadProp = new Properties();
private boolean enableDelete = true;
private boolean enable2PC = true;
@@ -361,6 +371,17 @@ public class DorisExecutionOptions implements Serializable
{
return this;
}
+ /**
+ * Sets whether to set http utf8 charset for stream load.
+ *
+ * @param httpUtf8Charset
+ * @return this DorisExecutionOptions.builder.
+ */
+ public Builder setHttpUtf8Charset(boolean httpUtf8Charset) {
+ this.httpUtf8Charset = httpUtf8Charset;
+ return this;
+ }
+
/**
* Sets the properties for stream load.
*
@@ -529,6 +550,7 @@ public class DorisExecutionOptions implements Serializable {
bufferCount,
labelPrefix,
useCache,
+ httpUtf8Charset,
streamLoadProp,
enableDelete,
enable2PC,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 38716826..015a7ecb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.sink;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.ConnectionConfig;
import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
@@ -27,29 +28,46 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HttpRequestExecutor;
import org.apache.http.protocol.RequestContent;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.SINK_HTTP_UTF8_CHARSET_DEFAULT;
/** util to build http client. */
public class HttpUtil {
private final int connectTimeout;
private final int waitForContinueTimeout;
+ private final boolean httpUtf8Charset;
private HttpClientBuilder httpClientBuilder;
public HttpUtil() {
this.connectTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
this.waitForContinueTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+ this.httpUtf8Charset = SINK_HTTP_UTF8_CHARSET_DEFAULT;
settingStreamHttpClientBuilder();
}
- public HttpUtil(DorisReadOptions readOptions) {
+ public HttpUtil(DorisReadOptions readOptions, boolean httpUtf8Charset) {
this.connectTimeout = readOptions.getRequestConnectTimeoutMs();
this.waitForContinueTimeout = readOptions.getRequestConnectTimeoutMs();
+ this.httpUtf8Charset = httpUtf8Charset;
settingStreamHttpClientBuilder();
}
private void settingStreamHttpClientBuilder() {
+ ConnectionConfig connectionConfig = ConnectionConfig.DEFAULT;
+ if (httpUtf8Charset) {
+ connectionConfig =
+ ConnectionConfig.custom()
+ .setCharset(StandardCharsets.UTF_8)
+ .setMalformedInputAction(CodingErrorAction.REPLACE)
+
.setUnmappableInputAction(CodingErrorAction.REPLACE)
+ .build();
+ }
this.httpClientBuilder =
HttpClients.custom()
+ .setDefaultConnectionConfig(connectionConfig)
// default timeout 3s, maybe report 307 error when fe
busy
.setRequestExecutor(new
HttpRequestExecutor(waitForContinueTimeout))
.setRedirectStrategy(
@@ -84,7 +102,17 @@ public class HttpUtil {
* @return
*/
public HttpClientBuilder getHttpClientBuilderForBatch() {
+ ConnectionConfig connectionConfig = ConnectionConfig.DEFAULT;
+ if (httpUtf8Charset) {
+ connectionConfig =
+ ConnectionConfig.custom()
+ .setCharset(StandardCharsets.UTF_8)
+ .setMalformedInputAction(CodingErrorAction.REPLACE)
+
.setUnmappableInputAction(CodingErrorAction.REPLACE)
+ .build();
+ }
return HttpClients.custom()
+ .setDefaultConnectionConfig(connectionConfig)
.setRedirectStrategy(
new DefaultRedirectStrategy() {
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index a7d7c8c6..ee1d9be6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -172,7 +172,9 @@ public class DorisBatchStreamLoad implements Serializable {
this.started = new AtomicBoolean(true);
this.loadExecutorService.execute(loadAsyncExecutor);
this.subTaskId = subTaskId;
- this.httpClientBuilder = new
HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
+ this.httpClientBuilder =
+ new HttpUtil(dorisReadOptions,
executionOptions.isHttpUtf8Charset())
+ .getHttpClientBuilderForBatch();
}
/**
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index b1a70059..2593e303 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -70,7 +70,8 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
dorisOptions,
dorisReadOptions,
executionOptions,
- new HttpUtil(dorisReadOptions).getHttpClient());
+ new HttpUtil(dorisReadOptions,
executionOptions.isHttpUtf8Charset())
+ .getHttpClient());
}
public DorisCommitter(
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 64fbc95f..da07249f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -310,7 +310,8 @@ public class DorisWriter<IN>
dorisOptions,
executionOptions,
labelGenerator,
- new
HttpUtil(dorisReadOptions).getHttpClient()));
+ new HttpUtil(dorisReadOptions,
executionOptions.isHttpUtf8Charset())
+ .getHttpClient()));
}
/** Http throws an exception actively, there is no need to check
regularly. */
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 56f96153..f1d44943 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -340,4 +340,11 @@ public class DorisConfigOptions {
}
return streamLoadProp;
}
+
+ public static final ConfigOption<Boolean> SINK_HTTP_UTF8_CHARSET =
+ ConfigOptions.key("sink.http-utf8-charset")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Set sink http client default charset to utf8 for
support unicode characters in header, the default value is false");
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 6495eadc..fd3d2ad3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -74,6 +74,7 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_HTTP_UTF8_CHARSET;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
@@ -165,6 +166,7 @@ public final class DorisDynamicTableFactory
options.add(USE_FLIGHT_SQL);
options.add(FLIGHT_SQL_PORT);
+ options.add(SINK_HTTP_UTF8_CHARSET);
return options;
}
@@ -263,6 +265,7 @@ public final class DorisDynamicTableFactory
(int)
readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
+ builder.setHttpUtf8Charset(readableConfig.get(SINK_HTTP_UTF8_CHARSET));
return builder.build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 326b4e87..13039785 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -326,6 +326,9 @@ public abstract class DatabaseSync {
sinkConfig
.getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
.ifPresent(executionBuilder::setIgnoreCommitError);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_HTTP_UTF8_CHARSET)
+ .ifPresent(executionBuilder::setHttpUtf8Charset);
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
index bc19c572..97c2868a 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
@@ -62,6 +62,7 @@ public class DorisExecutionOptionsTest {
.setUseCache(true)
.setFlushQueueSize(2)
.setIgnoreUpdateBefore(true)
+ .setHttpUtf8Charset(true)
.build();
DorisExecutionOptions.Builder builder =
@@ -81,7 +82,8 @@ public class DorisExecutionOptionsTest {
.setBatchMode(false)
.setUseCache(true)
.setFlushQueueSize(2)
- .setIgnoreUpdateBefore(true);
+ .setIgnoreUpdateBefore(true)
+ .setHttpUtf8Charset(true);
Assert.assertNotEquals(exceptOptions, null);
Assert.assertEquals(exceptOptions, exceptOptions);
@@ -147,9 +149,9 @@ public class DorisExecutionOptionsTest {
Assert.assertNotEquals(exceptOptions, builder.build());
builder.setFlushQueueSize(2);
- builder.setIgnoreUpdateBefore(false);
+ builder.setHttpUtf8Charset(false);
Assert.assertNotEquals(exceptOptions, builder.build());
- builder.setIgnoreUpdateBefore(true);
+ builder.setHttpUtf8Charset(true);
}
@Test(expected = IllegalArgumentException.class)
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 0ebc48fc..76256543 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -78,6 +78,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
static final String TABLE_GZ_FORMAT = "tbl_gz_format";
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";
+ static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column";
private final boolean batchMode;
@@ -732,4 +733,83 @@ public class DorisSinkITCase extends AbstractITCaseService
{
max,
model));
}
+
+ private void initializeUnicodeColumnTable(String table, DataModel
dataModel) {
+ String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "";
+ String morProps =
+ !DataModel.UNIQUE_MOR.equals(dataModel)
+ ? ""
+ : ",\"enable_unique_key_merge_on_write\" = \"false\"";
+ String model =
+ dataModel.equals(DataModel.UNIQUE_MOR)
+ ? DataModel.UNIQUE.toString()
+ : dataModel.toString();
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ "SET enable_unicode_name_support = true;",
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`名称` varchar(256),\n"
+ + "`年龄` int %s\n"
+ + ") "
+ + " %s KEY(`名称`) "
+ + " DISTRIBUTED BY HASH(`名称`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\"\n"
+ + morProps
+ + ")",
+ DATABASE,
+ table,
+ max,
+ model));
+ }
+
+ @Test
+ public void testSinkUnicodeColumn() throws Exception {
+ initializeUnicodeColumnTable(TABLE_UNICODE_COLUMN, DataModel.UNIQUE);
+ Properties properties = new Properties();
+ properties.setProperty("read_json_by_line", "true");
+ properties.setProperty("format", "json");
+
+ // mock data
+ Map<String, Object> row1 = new HashMap<>();
+ row1.put("名称", "doris1");
+ row1.put("年龄", 1);
+ Map<String, Object> row2 = new HashMap<>();
+ row2.put("名称", "doris2");
+ row2.put("年龄", 2);
+
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+ executionBuilder
+ .setLabelPrefix(UUID.randomUUID().toString())
+ .setBatchMode(batchMode)
+ .setStreamLoadProp(properties)
+ // uniq need to be false
+ .setDeletable(false)
+ .setHttpUtf8Charset(true);
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder
+ .setFenodes(getFenodes())
+ .setTableIdentifier(DATABASE + "." + TABLE_UNICODE_COLUMN)
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
+
+ submitJob(
+ dorisBuilder.build(),
+ executionBuilder.build(),
+ new String[] {
+ new ObjectMapper().writeValueAsString(row1),
+ new ObjectMapper().writeValueAsString(row2)
+ });
+
+ Thread.sleep(10000);
+ List<String> expected = Arrays.asList("doris1,1", "doris2,2");
+ String query =
+ String.format(
+ "select `名称`,`年龄` from %s.%s order by 1", DATABASE,
TABLE_UNICODE_COLUMN);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected,
query, 2);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]