This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new fad3c4cc [Improve](http) add timeout and waitForContinue config for
sink httpclient (#522)
fad3c4cc is described below
commit fad3c4cc13752f4c8f89e29341724b5e54473d41
Author: wudi <[email protected]>
AuthorDate: Fri Dec 6 10:40:48 2024 +0800
[Improve](http) add timeout and waitForContinue config for sink httpclient
(#522)
---
.../doris/flink/cfg/ConfigurationOptions.java | 4 +-
.../apache/doris/flink/cfg/DorisReadOptions.java | 25 +++---
.../java/org/apache/doris/flink/sink/HttpUtil.java | 88 +++++++++++++++-------
.../flink/sink/batch/DorisBatchStreamLoad.java | 3 +-
.../doris/flink/sink/committer/DorisCommitter.java | 6 +-
.../doris/flink/sink/writer/DorisWriter.java | 2 +-
.../flink/table/DorisDynamicTableFactoryTest.java | 4 +-
7 files changed, 86 insertions(+), 46 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index 3709f0ae..8ae7f636 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -33,8 +33,8 @@ public interface ConfigurationOptions {
String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout";
String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout";
Integer DORIS_REQUEST_RETRIES_DEFAULT = 3;
- Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
- Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
+ Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 60 * 1000;
+ Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 60 * 1000;
Integer DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 21600;
String DORIS_TABLET_SIZE = "doris.request.tablet.size";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 0448d60a..22a77b83 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -210,17 +210,22 @@ public class DorisReadOptions implements Serializable {
private String readFields;
private String filterQuery;
- private Integer requestTabletSize;
- private Integer requestConnectTimeoutMs;
- private Integer requestReadTimeoutMs;
- private Integer requestQueryTimeoutS;
- private Integer requestRetries;
- private Integer requestBatchSize;
- private Long execMemLimit;
- private Integer deserializeQueueSize;
- private Boolean deserializeArrowAsync;
+ private Integer requestTabletSize =
ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+ private Integer requestConnectTimeoutMs =
+ ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+ private Integer requestReadTimeoutMs =
+ ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
+ private Integer requestQueryTimeoutS =
+ ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
+ private Integer requestRetries =
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
+ private Integer requestBatchSize =
ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+ private Long execMemLimit =
ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+ private Integer deserializeQueueSize =
+ ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
+ private Boolean deserializeArrowAsync =
+ ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
private Boolean useOldApi = false;
- private Boolean useFlightSql = false;
+ private Boolean useFlightSql =
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
private Integer flightSqlPort;
/**
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 518eea71..94b65c79 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -17,50 +17,70 @@
package org.apache.doris.flink.sink;
+import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.protocol.HttpRequestExecutor;
-import java.util.concurrent.TimeUnit;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
/** util to build http client. */
public class HttpUtil {
+ private final int connectTimeout;
+ private final int socketTimeout;
+ private HttpClientBuilder httpClientBuilder;
- private RequestConfig requestConfigStream =
- RequestConfig.custom()
- .setConnectTimeout(60 * 1000)
- .setConnectionRequestTimeout(60 * 1000)
- .build();
+ public HttpUtil() {
+ this.connectTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+ this.socketTimeout = DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
+ settingStreamHttpClientBuilder();
+ }
- private final HttpClientBuilder httpClientBuilder =
- HttpClients.custom()
- .setRedirectStrategy(
- new DefaultRedirectStrategy() {
- @Override
- protected boolean isRedirectable(String
method) {
- return true;
- }
- })
-
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
- .evictExpiredConnections()
- .evictIdleConnections(60, TimeUnit.SECONDS)
- .setDefaultRequestConfig(requestConfigStream);
+ public HttpUtil(DorisReadOptions readOptions) {
+ this.connectTimeout = readOptions.getRequestConnectTimeoutMs();
+ this.socketTimeout = readOptions.getRequestReadTimeoutMs();
+ settingStreamHttpClientBuilder();
+ }
+ private void settingStreamHttpClientBuilder() {
+ this.httpClientBuilder =
+ HttpClients.custom()
+ // default timeout 3s, maybe report 307 error when fe
busy
+ .setRequestExecutor(new
HttpRequestExecutor(socketTimeout))
+ .setRedirectStrategy(
+ new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String
method) {
+ return true;
+ }
+ })
+
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
+ .setDefaultRequestConfig(
+ RequestConfig.custom()
+ .setConnectTimeout(connectTimeout)
+
.setConnectionRequestTimeout(connectTimeout)
+ .build());
+ }
+
+ /**
+ * for stream http
+ *
+ * @return
+ */
public CloseableHttpClient getHttpClient() {
return httpClientBuilder.build();
}
- private RequestConfig requestConfig =
- RequestConfig.custom()
- .setConnectTimeout(60 * 1000)
- .setConnectionRequestTimeout(60 * 1000)
- // default checkpoint timeout is 10min
- .setSocketTimeout(9 * 60 * 1000)
- .build();
-
+ /**
+ * for batch http
+ *
+ * @return
+ */
public HttpClientBuilder getHttpClientBuilderForBatch() {
return HttpClients.custom()
.setRedirectStrategy(
@@ -70,12 +90,22 @@ public class HttpUtil {
return true;
}
})
- .setDefaultRequestConfig(requestConfig);
+ .setDefaultRequestConfig(
+ RequestConfig.custom()
+ .setConnectTimeout(connectTimeout)
+ .setConnectionRequestTimeout(connectTimeout)
+ .setSocketTimeout(socketTimeout)
+ .build());
}
public HttpClientBuilder getHttpClientBuilderForCopyBatch() {
return HttpClients.custom()
.disableRedirectHandling()
- .setDefaultRequestConfig(requestConfig);
+ .setDefaultRequestConfig(
+ RequestConfig.custom()
+ .setConnectTimeout(connectTimeout)
+ .setConnectionRequestTimeout(connectTimeout)
+ .setSocketTimeout(socketTimeout)
+ .build());
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 5a32949e..479fab64 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -102,7 +102,7 @@ public class DorisBatchStreamLoad implements Serializable {
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
- private HttpClientBuilder httpClientBuilder = new
HttpUtil().getHttpClientBuilderForBatch();
+ private HttpClientBuilder httpClientBuilder;
private BackendUtil backendUtil;
private boolean enableGroupCommit;
private boolean enableGzCompress;
@@ -169,6 +169,7 @@ public class DorisBatchStreamLoad implements Serializable {
this.started = new AtomicBoolean(true);
this.loadExecutorService.execute(loadAsyncExecutor);
this.subTaskId = subTaskId;
+ this.httpClientBuilder = new
HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
}
/**
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index eafffd53..e73d96cd 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -66,7 +66,11 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions) {
- this(dorisOptions, dorisReadOptions, executionOptions, new
HttpUtil().getHttpClient());
+ this(
+ dorisOptions,
+ dorisReadOptions,
+ executionOptions,
+ new HttpUtil(dorisReadOptions).getHttpClient());
}
public DorisCommitter(
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index e84197d1..fdb797f9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -346,7 +346,7 @@ public class DorisWriter<IN>
dorisOptions,
executionOptions,
labelGenerator,
- new HttpUtil().getHttpClient()));
+ new
HttpUtil(dorisReadOptions).getHttpClient()));
}
/** Check the streamload http request regularly. */
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 2baf6f56..4a01473d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -248,8 +248,8 @@ public class DorisDynamicTableFactoryTest {
options.put("password", "");
options.put("auto-redirect", "true");
options.put("doris.request.retries", "3");
- options.put("doris.request.connect.timeout", "30s");
- options.put("doris.request.read.timeout", "30s");
+ options.put("doris.request.connect.timeout", "60s");
+ options.put("doris.request.read.timeout", "60s");
return options;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]