This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e5bba52 [improve] Improve Httpclient Connection (#344)
e5bba52 is described below
commit e5bba5207b5537ca2103374af82660bc962607de
Author: wudi <[email protected]>
AuthorDate: Tue Mar 19 14:08:33 2024 +0800
[improve] Improve Httpclient Connection (#344)
---
.../java/org/apache/doris/flink/sink/HttpUtil.java | 31 ++++---
.../flink/sink/batch/DorisBatchStreamLoad.java | 54 ++++++------
.../doris/flink/sink/copy/BatchStageLoad.java | 98 ++++++++++++----------
.../doris/flink/sink/copy/DorisCopyCommitter.java | 64 +++++++-------
.../flink/sink/copy/TestDorisCopyCommitter.java | 5 +-
5 files changed, 139 insertions(+), 113 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 43e2bea..1307ce4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -18,12 +18,13 @@
package org.apache.doris.flink.sink;
import org.apache.http.client.config.RequestConfig;
-import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
+import java.util.concurrent.TimeUnit;
+
/** util to build http client. */
public class HttpUtil {
private final HttpClientBuilder httpClientBuilder =
@@ -34,7 +35,9 @@ public class HttpUtil {
protected boolean isRedirectable(String
method) {
return true;
}
- });
+ })
+ .evictExpiredConnections()
+ .evictIdleConnections(60, TimeUnit.SECONDS);
public CloseableHttpClient getHttpClient() {
return httpClientBuilder.build();
@@ -48,17 +51,21 @@ public class HttpUtil {
.setSocketTimeout(9 * 60 * 1000)
.build();
- public CloseableHttpClient getHttpClientForBatch() {
- return
httpClientBuilder.setDefaultRequestConfig(requestConfig).build();
+ public HttpClientBuilder getHttpClientBuilderForBatch() {
+ return HttpClients.custom()
+ .setRedirectStrategy(
+ new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ })
+ .setDefaultRequestConfig(requestConfig);
}
- private final HttpClientBuilder httpClientBuilderWithTimeout =
- HttpClients.custom().setDefaultRequestConfig(requestConfig);
-
- public CloseableHttpClient getHttpClientWithTimeout() {
- return httpClientBuilderWithTimeout
- // fix failed to respond for commit copy
- .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
- .build();
+ public HttpClientBuilder getHttpClientBuilderForCopyBatch() {
+ return HttpClients.custom()
+ .disableRedirectHandling()
+ .setDefaultRequestConfig(requestConfig);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 0971be0..ad4ccde 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -35,6 +35,7 @@ import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class DorisBatchStreamLoad implements Serializable {
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
- private CloseableHttpClient httpClient = new
HttpUtil().getHttpClientForBatch();
+ private HttpClientBuilder httpClientBuilder = new
HttpUtil().getHttpClientBuilderForBatch();
private BackendUtil backendUtil;
public DorisBatchStreamLoad(
@@ -274,32 +275,35 @@ public class DorisBatchStreamLoad implements Serializable
{
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
LOG.info("stream load started for {} on host {}", label,
hostPort);
- try (CloseableHttpResponse response =
httpClient.execute(putBuilder.build())) {
- int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode == 200 && response.getEntity() != null) {
- String loadResult =
EntityUtils.toString(response.getEntity());
- LOG.info("load Result {}", loadResult);
- RespContent respContent =
- OBJECT_MAPPER.readValue(loadResult,
RespContent.class);
- if
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
- String errMsg =
- String.format(
- "stream load error: %s, see more
in %s",
- respContent.getMessage(),
respContent.getErrorURL());
- throw new DorisBatchLoadException(errMsg);
- } else {
- return;
+ try (CloseableHttpClient httpClient =
httpClientBuilder.build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(putBuilder.build())) {
+ int statusCode =
response.getStatusLine().getStatusCode();
+ if (statusCode == 200 && response.getEntity() != null)
{
+ String loadResult =
EntityUtils.toString(response.getEntity());
+ LOG.info("load Result {}", loadResult);
+ RespContent respContent =
+ OBJECT_MAPPER.readValue(loadResult,
RespContent.class);
+ if
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ String errMsg =
+ String.format(
+ "stream load error: %s, see
more in %s",
+ respContent.getMessage(),
+ respContent.getErrorURL());
+ throw new DorisBatchLoadException(errMsg);
+ } else {
+ return;
+ }
}
+ LOG.error(
+ "stream load failed with {}, reason {}, to
retry",
+ hostPort,
+ response.getStatusLine().toString());
+ } catch (Exception ex) {
+ if (retry == executionOptions.getMaxRetries()) {
+ throw new DorisBatchLoadException("stream load
error: ", ex);
+ }
+ LOG.error("stream load error with {}, to retry, cause
by", hostPort, ex);
}
- LOG.error(
- "stream load failed with {}, reason {}, to retry",
- hostPort,
- response.getStatusLine().toString());
- } catch (Exception ex) {
- if (retry == executionOptions.getMaxRetries()) {
- throw new DorisBatchLoadException("stream load error:
", ex);
- }
- LOG.error("stream load error with {}, to retry, cause by",
hostPort, ex);
}
retry++;
// get available backend retry
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
index 97e217d..c192d76 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
@@ -35,6 +35,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class BatchStageLoad implements Serializable {
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
- private CloseableHttpClient httpClient = new
HttpUtil().getHttpClientWithTimeout();
+ private HttpClientBuilder httpClientBuilder = new
HttpUtil().getHttpClientBuilderForCopyBatch();
public BatchStageLoad(
DorisOptions dorisOptions,
@@ -290,33 +291,38 @@ public class BatchStageLoad implements Serializable {
BackoffAndRetryUtils.backoffAndRetry(
BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE,
() -> {
- try (CloseableHttpResponse response =
- httpClient.execute(httpPut)) {
- final int statusCode =
-
response.getStatusLine().getStatusCode();
- String requestId =
getRequestId(response.getAllHeaders());
- if (statusCode == 200 &&
response.getEntity() != null) {
- String loadResult =
-
EntityUtils.toString(response.getEntity());
- if (loadResult == null ||
loadResult.isEmpty()) {
- // upload finished
- return requestId;
+ try (CloseableHttpClient httpClient =
+ httpClientBuilder.build()) {
+ try (CloseableHttpResponse response =
+ httpClient.execute(httpPut)) {
+ final int statusCode =
+
response.getStatusLine().getStatusCode();
+ String requestId =
+
getRequestId(response.getAllHeaders());
+ if (statusCode == 200 &&
response.getEntity() != null) {
+ String loadResult =
+
EntityUtils.toString(response.getEntity());
+ if (loadResult == null ||
loadResult.isEmpty()) {
+ // upload finished
+ return requestId;
+ }
+ LOG.error(
+ "upload file failed,
requestId is {}, response result: {}",
+ requestId,
+ loadResult);
+ throw new CopyLoadException(
+ "upload file failed: "
+ +
response.getStatusLine()
+
.toString()
+ + ", with
requestId "
+ + requestId);
}
- LOG.error(
- "upload file failed,
requestId is {}, response result: {}",
- requestId,
- loadResult);
throw new CopyLoadException(
- "upload file failed: "
+ "upload file error: "
+
response.getStatusLine().toString()
+ ", with
requestId "
+ requestId);
}
- throw new CopyLoadException(
- "upload file error: "
- +
response.getStatusLine().toString()
- + ", with requestId "
- + requestId);
}
});
return String.valueOf(result);
@@ -361,27 +367,33 @@ public class BatchStageLoad implements Serializable {
BackoffAndRetryUtils.backoffAndRetry(
BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS,
() -> {
- try (CloseableHttpResponse execute =
-
httpClient.execute(putBuilder.build())) {
- int statusCode =
execute.getStatusLine().getStatusCode();
- String reason =
execute.getStatusLine().getReasonPhrase();
- if (statusCode == 307) {
- Header location =
execute.getFirstHeader("location");
- String uploadAddress =
location.getValue();
- return uploadAddress;
- } else {
- HttpEntity entity =
execute.getEntity();
- String result =
- entity == null
- ? null
- :
EntityUtils.toString(entity);
- LOG.error(
- "Failed to get
internalStage address, status {}, reason {}, response {}",
- statusCode,
- reason,
- result);
- throw new CopyLoadException(
- "Failed get internalStage
address");
+ try (CloseableHttpClient httpClient =
+ httpClientBuilder.build()) {
+ try (CloseableHttpResponse execute =
+
httpClient.execute(putBuilder.build())) {
+ int statusCode =
+
execute.getStatusLine().getStatusCode();
+ String reason =
+
execute.getStatusLine().getReasonPhrase();
+ if (statusCode == 307) {
+ Header location =
+
execute.getFirstHeader("location");
+ String uploadAddress =
location.getValue();
+ return uploadAddress;
+ } else {
+ HttpEntity entity =
execute.getEntity();
+ String result =
+ entity == null
+ ? null
+ :
EntityUtils.toString(entity);
+ LOG.error(
+ "Failed to get
internalStage address, status {}, reason {}, response {}",
+ statusCode,
+ reason,
+ result);
+ throw new CopyLoadException(
+ "Failed get
internalStage address");
+ }
}
}
});
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
index 16be357..095c680 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
@@ -31,6 +31,7 @@ import org.apache.doris.flink.sink.copy.models.CopyIntoResp;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,18 +48,20 @@ public class DorisCopyCommitter implements
Committer<DorisCopyCommittable>, Clos
private static final int SUCCESS = 0;
private static final String FAIL = "1";
private ObjectMapper objectMapper = new ObjectMapper();
- private final CloseableHttpClient httpClient;
private final DorisOptions dorisOptions;
+ private HttpClientBuilder httpClientBuilder = new
HttpUtil().getHttpClientBuilderForCopyBatch();
int maxRetry;
public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry) {
- this(dorisOptions, maxRetry, new
HttpUtil().getHttpClientWithTimeout());
+ this.dorisOptions = dorisOptions;
+ this.maxRetry = maxRetry;
}
- public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry,
CloseableHttpClient client) {
+ public DorisCopyCommitter(
+ DorisOptions dorisOptions, int maxRetry, HttpClientBuilder
httpClientBuilder) {
this.dorisOptions = dorisOptions;
this.maxRetry = maxRetry;
- this.httpClient = client;
+ this.httpClientBuilder = httpClientBuilder;
}
@Override
@@ -88,31 +91,32 @@ public class DorisCopyCommitter implements
Committer<DorisCopyCommittable>, Clos
.setUrl(String.format(commitPattern, hostPort))
.baseAuth(dorisOptions.getUsername(),
dorisOptions.getPassword())
.setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
-
- try (CloseableHttpResponse response =
httpClient.execute(postBuilder.build())) {
- statusCode = response.getStatusLine().getStatusCode();
- reasonPhrase = response.getStatusLine().getReasonPhrase();
- if (statusCode != 200) {
- LOG.warn(
- "commit failed with status {} {}, reason {}",
- statusCode,
- hostPort,
- reasonPhrase);
- } else if (response.getEntity() != null) {
- loadResult = EntityUtils.toString(response.getEntity());
- success = handleCommitResponse(loadResult);
- if (success) {
- LOG.info(
- "commit success cost {}ms, response is {}",
- System.currentTimeMillis() - start,
- loadResult);
- break;
- } else {
- LOG.warn("commit failed, retry again");
+ try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(postBuilder.build())) {
+ statusCode = response.getStatusLine().getStatusCode();
+ reasonPhrase = response.getStatusLine().getReasonPhrase();
+ if (statusCode != 200) {
+ LOG.warn(
+ "commit failed with status {} {}, reason {}",
+ statusCode,
+ hostPort,
+ reasonPhrase);
+ } else if (response.getEntity() != null) {
+ loadResult =
EntityUtils.toString(response.getEntity());
+ success = handleCommitResponse(loadResult);
+ if (success) {
+ LOG.info(
+ "commit success cost {}ms, response is {}",
+ System.currentTimeMillis() - start,
+ loadResult);
+ break;
+ } else {
+ LOG.warn("commit failed, retry again");
+ }
}
+ } catch (IOException e) {
+ LOG.error("commit error : ", e);
}
- } catch (IOException e) {
- LOG.error("commit error : ", e);
}
}
@@ -158,9 +162,5 @@ public class DorisCopyCommitter implements
Committer<DorisCopyCommittable>, Clos
}
@Override
- public void close() throws IOException {
- if (httpClient != null) {
- httpClient.close();
- }
- }
+ public void close() throws IOException {}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
index 7a7f326..23399ac 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
@@ -26,6 +26,7 @@ import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicStatusLine;
import org.junit.Assert;
import org.junit.Before;
@@ -48,14 +49,16 @@ public class TestDorisCopyCommitter {
public void setUp() throws Exception {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
copyCommittable = new DorisCopyCommittable("127.0.0.1:8710", "copy
into sql");
+ HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+ when(httpClientBuilder.build()).thenReturn(httpClient);
entityMock = new HttpEntityMock();
CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
StatusLine normalLine = new BasicStatusLine(new
ProtocolVersion("http", 1, 0), 200, "");
when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
- copyCommitter = new DorisCopyCommitter(dorisOptions, 1, httpClient);
+ copyCommitter = new DorisCopyCommitter(dorisOptions, 1,
httpClientBuilder);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]