This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 921ef6de1 [FLINK-37204][pipeline-connector/starrocks] Add missing
StarRocks connector options for 1.2.10
921ef6de1 is described below
commit 921ef6de1512c9f0ffe5e4bdd05242e988a43d7b
Author: moses <[email protected]>
AuthorDate: Wed Mar 12 21:21:08 2025 +0800
[FLINK-37204][pipeline-connector/starrocks] Add missing StarRocks connector
options for 1.2.10
This closes #3883
---
docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md | 7 +++++++
docs/content/docs/connectors/pipeline-connectors/starrocks.md | 7 +++++++
.../cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java | 6 ++++++
.../cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java | 8 ++++++++
4 files changed, 28 insertions(+)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
index eba6c2d87..7a950910f 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
@@ -208,6 +208,13 @@ pipeline:
<td>Duration</td>
<td>StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema
change,从而导致作业失败。</td>
</tr>
+ <tr>
+ <td>sink.socket.timeout-ms</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Integer</td>
+ <td>StarRocks 客户端等待数据的超时时间,默认值 -1 表示永不超时。</td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
index dcb8a43f2..1910b3b0e 100644
--- a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
@@ -215,6 +215,13 @@ pipeline:
seconds. StarRocks will cancel the schema change after timeout which
will
cause the sink failure. </td>
</tr>
+ <tr>
+ <td>sink.socket.timeout-ms</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Integer</td>
+ <td>The time duration for which the HTTP client waits for data. The
default value -1 means there is no timeout</td>
+ </tr>
</tbody>
</table>
</div>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
index f9fe58034..fbb34cd51 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
@@ -81,6 +81,11 @@ public class StarRocksDataSinkFactory implements
DataSinkFactory {
.ifPresent(
config ->
sinkConfig.set(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT, config));
+
+ cdcConfig
+ .getOptional(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT)
+ .ifPresent(
+ config ->
sinkConfig.set(StarRocksSinkOptions.SINK_SOCKET_TIMEOUT, config));
cdcConfig
.getOptional(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT)
.ifPresent(
@@ -167,6 +172,7 @@ public class StarRocksDataSinkFactory implements
DataSinkFactory {
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
optionalOptions.add(StarRocksDataSinkOptions.SINK_LABEL_PREFIX);
optionalOptions.add(StarRocksDataSinkOptions.SINK_CONNECT_TIMEOUT);
+ optionalOptions.add(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT);
optionalOptions.add(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT);
optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE);
optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
index d9cb611fb..5f8220602 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
@@ -73,6 +73,14 @@ public class StarRocksDataSinkOptions {
.defaultValue(30000)
.withDescription("Timeout in millisecond for connecting to
the `load-url`.");
+ public static final ConfigOption<Integer> SINK_SOCKET_TIMEOUT =
+ ConfigOptions.key("sink.socket.timeout-ms")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The time duration for which the HTTP client waits
for data."
+ + " Unit: ms. The default value -1 means
there is no timeout.");
+
public static final ConfigOption<Integer> SINK_WAIT_FOR_CONTINUE_TIMEOUT =
ConfigOptions.key("sink.wait-for-continue.timeout-ms")
.intType()