This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 921ef6de1 [FLINK-37204][pipeline-connector/starrocks] Add missing 
StarRocks connector options for 1.2.10
921ef6de1 is described below

commit 921ef6de1512c9f0ffe5e4bdd05242e988a43d7b
Author: moses <[email protected]>
AuthorDate: Wed Mar 12 21:21:08 2025 +0800

    [FLINK-37204][pipeline-connector/starrocks] Add missing StarRocks connector 
options for 1.2.10
    
    This closes  #3883
---
 docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md  | 7 +++++++
 docs/content/docs/connectors/pipeline-connectors/starrocks.md     | 7 +++++++
 .../cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java   | 6 ++++++
 .../cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java   | 8 ++++++++
 4 files changed, 28 insertions(+)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
index eba6c2d87..7a950910f 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
@@ -208,6 +208,13 @@ pipeline:
       <td>Duration</td>
       <td>StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema 
change,从而导致作业失败。</td>
     </tr>
+    <tr>
+      <td>sink.socket.timeout-ms</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Integer</td>
+      <td>StarRocks 客户端等待数据的超时时间,默认值 -1 表示永不超时。</td>
+    </tr>
     </tbody>
 </table>    
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/starrocks.md 
b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
index dcb8a43f2..1910b3b0e 100644
--- a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
@@ -215,6 +215,13 @@ pipeline:
           seconds. StarRocks will cancel the schema change after timeout which 
will
           cause the sink failure. </td>
     </tr>
+    <tr>
+      <td>sink.socket.timeout-ms</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Integer</td>
+      <td>The time duration for which the HTTP client waits for data. The 
default value -1 means there is no timeout</td>
+    </tr>
     </tbody>
 </table>    
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
index f9fe58034..fbb34cd51 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
@@ -81,6 +81,11 @@ public class StarRocksDataSinkFactory implements 
DataSinkFactory {
                 .ifPresent(
                         config ->
                                 
sinkConfig.set(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT, config));
+
+        cdcConfig
+                .getOptional(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT)
+                .ifPresent(
+                        config -> 
sinkConfig.set(StarRocksSinkOptions.SINK_SOCKET_TIMEOUT, config));
         cdcConfig
                 
.getOptional(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT)
                 .ifPresent(
@@ -167,6 +172,7 @@ public class StarRocksDataSinkFactory implements 
DataSinkFactory {
         Set<ConfigOption<?>> optionalOptions = new HashSet<>();
         optionalOptions.add(StarRocksDataSinkOptions.SINK_LABEL_PREFIX);
         optionalOptions.add(StarRocksDataSinkOptions.SINK_CONNECT_TIMEOUT);
+        optionalOptions.add(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT);
         
optionalOptions.add(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT);
         optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE);
         
optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
index d9cb611fb..5f8220602 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
@@ -73,6 +73,14 @@ public class StarRocksDataSinkOptions {
                     .defaultValue(30000)
                     .withDescription("Timeout in millisecond for connecting to 
the `load-url`.");
 
+    public static final ConfigOption<Integer> SINK_SOCKET_TIMEOUT =
+            ConfigOptions.key("sink.socket.timeout-ms")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The time duration for which the HTTP client waits 
for data."
+                                    + " Unit: ms. The default value -1 means 
there is no timeout.");
+
     public static final ConfigOption<Integer> SINK_WAIT_FOR_CONTINUE_TIMEOUT =
             ConfigOptions.key("sink.wait-for-continue.timeout-ms")
                     .intType()

Reply via email to