This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new febdb262b6 [Improve][StarRocksSink] add http socket timeout. (#5918)
febdb262b6 is described below

commit febdb262b6fd0f3d284fba820cd67051f6c802b5
Author: lightzhao <[email protected]>
AuthorDate: Mon Nov 27 18:12:48 2023 +0800

    [Improve][StarRocksSink] add http socket timeout. (#5918)
---
 docs/en/connector-v2/sink/StarRocks.md                    |  1 +
 .../connectors/seatunnel/starrocks/client/HttpHelper.java | 15 ++++++++++++++-
 .../starrocks/client/StarRocksStreamLoadVisitor.java      |  3 ++-
 .../connectors/seatunnel/starrocks/config/SinkConfig.java |  3 +++
 .../seatunnel/starrocks/config/StarRocksSinkOptions.java  |  6 ++++++
 .../seatunnel/starrocks/sink/StarRocksSinkFactory.java    |  3 ++-
 6 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/docs/en/connector-v2/sink/StarRocks.md 
b/docs/en/connector-v2/sink/StarRocks.md
index 38893a429e..03c8a933d1 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -37,6 +37,7 @@ The internal implementation of StarRocks sink connector is 
cached and imported b
 | enable_upsert_delete        | boolean | no       | false           | Whether 
to enable upsert/delete, only supports PrimaryKey model.                        
                                                                                
                                          |
 | save_mode_create_template   | string  | no       | see below       | see 
below                                                                           
                                                                                
                                              |
 | starrocks.config            | map     | no       | -               | The 
parameter of the stream load `data_desc`                                        
                                                                                
                                              |
+| http_socket_timeout_ms      | int     | no       | 180000          | Set 
http socket timeout, default is 3 minutes.                                      
                                                                                
                                              |
 
 ### save_mode_create_template
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
index d9a9a68428..80fd22672a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
 
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpStatus;
@@ -45,6 +46,14 @@ import java.util.Map;
 public class HttpHelper {
     private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;
 
+    private SinkConfig sinkConfig;
+
+    public HttpHelper() {}
+
+    public HttpHelper(SinkConfig sinkConfig) {
+        this.sinkConfig = sinkConfig;
+    }
+
     public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
         int code = resp.getStatusLine().getStatusCode();
         if (HttpStatus.SC_OK != code) {
@@ -133,7 +142,11 @@ public class HttpHelper {
                 }
             }
             httpPut.setEntity(new ByteArrayEntity(data));
-            
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+            httpPut.setConfig(
+                    RequestConfig.custom()
+                            
.setSocketTimeout(sinkConfig.getHttpSocketTimeout())
+                            .setRedirectsEnabled(true)
+                            .build());
             try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
                 int code = resp.getStatusLine().getStatusCode();
                 if (HttpStatus.SC_OK != code) {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
index 41cbbf6c49..d004213b7a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
@@ -42,7 +42,7 @@ public class StarRocksStreamLoadVisitor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
 
-    private final HttpHelper httpHelper = new HttpHelper();
+    private final HttpHelper httpHelper;
     private static final int MAX_SLEEP_TIME = 5;
 
     private final SinkConfig sinkConfig;
@@ -61,6 +61,7 @@ public class StarRocksStreamLoadVisitor {
     public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List<String> 
fieldNames) {
         this.sinkConfig = sinkConfig;
         this.fieldNames = fieldNames;
+        this.httpHelper = new HttpHelper(sinkConfig);
     }
 
     public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws 
IOException {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index c1709b6939..6862a87bc5 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -60,6 +60,8 @@ public class SinkConfig implements Serializable {
 
     private DataSaveMode dataSaveMode;
 
+    private int httpSocketTimeout;
+
     @Getter private final Map<String, Object> streamLoadProps = new 
HashMap<>();
 
     public static SinkConfig of(ReadonlyConfig config) {
@@ -90,6 +92,7 @@ public class SinkConfig implements Serializable {
                 .ifPresent(sinkConfig::setColumnSeparator);
         sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
         sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
+        
sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
         return sinkConfig;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index d03e568c86..6500d1474d 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -138,4 +138,10 @@ public interface StarRocksSinkOptions {
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription(
                             "Table structure and data processing methods that 
already exist on the target end");
+
+    Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
+            Options.key("http_socket_timeout_ms")
+                    .intType()
+                    .defaultValue(3 * 60 * 1000)
+                    .withDescription("Set http socket timeout, default is 3 
minutes.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 7c3592411a..9c0a8b42d1 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -55,7 +55,8 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
                         StarRocksSinkOptions.STARROCKS_CONFIG,
                         StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
                         StarRocksSinkOptions.SAVE_MODE,
-                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
+                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
+                        StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
                 .build();
     }
 

Reply via email to