This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new febdb262b6 [Improve][StarRocksSink] add http socket timeout. (#5918)
febdb262b6 is described below
commit febdb262b6fd0f3d284fba820cd67051f6c802b5
Author: lightzhao <[email protected]>
AuthorDate: Mon Nov 27 18:12:48 2023 +0800
[Improve][StarRocksSink] add http socket timeout. (#5918)
---
docs/en/connector-v2/sink/StarRocks.md | 1 +
.../connectors/seatunnel/starrocks/client/HttpHelper.java | 15 ++++++++++++++-
.../starrocks/client/StarRocksStreamLoadVisitor.java | 3 ++-
.../connectors/seatunnel/starrocks/config/SinkConfig.java | 3 +++
.../seatunnel/starrocks/config/StarRocksSinkOptions.java | 6 ++++++
.../seatunnel/starrocks/sink/StarRocksSinkFactory.java | 3 ++-
6 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/docs/en/connector-v2/sink/StarRocks.md
b/docs/en/connector-v2/sink/StarRocks.md
index 38893a429e..03c8a933d1 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -37,6 +37,7 @@ The internal implementation of StarRocks sink connector is
cached and imported b
| enable_upsert_delete | boolean | no | false | Whether
to enable upsert/delete, only supports PrimaryKey model.
|
| save_mode_create_template | string | no | see below | see
below
|
| starrocks.config | map | no | - | The
parameter of the stream load `data_desc`
|
+| http_socket_timeout_ms | int | no | 180000 | Set
http socket timeout, default is 3 minutes.
|
### save_mode_create_template
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
index d9a9a68428..80fd22672a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
@@ -45,6 +46,14 @@ import java.util.Map;
public class HttpHelper {
private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;
+ private SinkConfig sinkConfig;
+
+ public HttpHelper() {}
+
+ public HttpHelper(SinkConfig sinkConfig) {
+ this.sinkConfig = sinkConfig;
+ }
+
public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {
@@ -133,7 +142,11 @@ public class HttpHelper {
}
}
httpPut.setEntity(new ByteArrayEntity(data));
-
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+ httpPut.setConfig(
+ RequestConfig.custom()
+
.setSocketTimeout(sinkConfig.getHttpSocketTimeout())
+ .setRedirectsEnabled(true)
+ .build());
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
index 41cbbf6c49..d004213b7a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
@@ -42,7 +42,7 @@ public class StarRocksStreamLoadVisitor {
private static final Logger LOG =
LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
- private final HttpHelper httpHelper = new HttpHelper();
+ private final HttpHelper httpHelper;
private static final int MAX_SLEEP_TIME = 5;
private final SinkConfig sinkConfig;
@@ -61,6 +61,7 @@ public class StarRocksStreamLoadVisitor {
public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List<String>
fieldNames) {
this.sinkConfig = sinkConfig;
this.fieldNames = fieldNames;
+ this.httpHelper = new HttpHelper(sinkConfig);
}
public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws
IOException {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index c1709b6939..6862a87bc5 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -60,6 +60,8 @@ public class SinkConfig implements Serializable {
private DataSaveMode dataSaveMode;
+ private int httpSocketTimeout;
+
@Getter private final Map<String, Object> streamLoadProps = new
HashMap<>();
public static SinkConfig of(ReadonlyConfig config) {
@@ -90,6 +92,7 @@ public class SinkConfig implements Serializable {
.ifPresent(sinkConfig::setColumnSeparator);
sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
+
sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
return sinkConfig;
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index d03e568c86..6500d1474d 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -138,4 +138,10 @@ public interface StarRocksSinkOptions {
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription(
"Table structure and data processing methods that
already exist on the target end");
+
+ Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
+ Options.key("http_socket_timeout_ms")
+ .intType()
+ .defaultValue(3 * 60 * 1000)
+ .withDescription("Set http socket timeout, default is 3
minutes.");
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 7c3592411a..9c0a8b42d1 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -55,7 +55,8 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SAVE_MODE,
- StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
+ StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.build();
}