This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 934b081 commitTransaction method improvement (#121)
934b081 is described below
commit 934b08172f388887c80161d3ca5a355611903ebd
Author: benjobs <[email protected]>
AuthorDate: Fri Mar 10 14:58:44 2023 +0800
commitTransaction method improvement (#121)
* commitTransaction method improvemen, http request failed, no exception
was thrown bug fixed, minor optimize
Co-authored-by: benjobs <[email protected]>
---
.../doris/flink/sink/committer/DorisCommitter.java | 83 ++++++++++++----------
1 file changed, 44 insertions(+), 39 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 316f92e..acbd310 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -29,7 +29,9 @@ import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.ResponseUtil;
+import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
@@ -52,6 +54,8 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
private final CloseableHttpClient httpClient;
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
+ private final ObjectMapper jsonMapper = new ObjectMapper();
+
int maxRetry;
public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions
dorisReadOptions, int maxRetry) {
@@ -66,7 +70,7 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
}
@Override
- public List<DorisCommittable> commit(List<DorisCommittable>
committableList) throws IOException, InterruptedException {
+ public List<DorisCommittable> commit(List<DorisCommittable>
committableList) throws IOException {
for (DorisCommittable committable : committableList) {
commitTransaction(committable);
}
@@ -74,49 +78,50 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
}
private void commitTransaction(DorisCommittable committable) throws
IOException {
- int statusCode = -1;
- String reasonPhrase = null;
- int retry = 0;
+ //basic params
+ HttpPutBuilder builder = new HttpPutBuilder()
+ .addCommonHeader()
+ .baseAuth(dorisOptions.getUsername(),
dorisOptions.getPassword())
+ .addTxnId(committable.getTxnID())
+ .commit();
+
+ //hostPort
String hostPort = committable.getHostPort();
- CloseableHttpResponse response = null;
+
+ int retry = 0;
while (retry++ <= maxRetry) {
- HttpPutBuilder putBuilder = new HttpPutBuilder();
- putBuilder.setUrl(String.format(commitPattern, hostPort,
committable.getDb()))
- .baseAuth(dorisOptions.getUsername(),
dorisOptions.getPassword())
- .addCommonHeader()
- .addTxnId(committable.getTxnID())
- .setEmptyEntity()
- .commit();
- try {
- response = httpClient.execute(putBuilder.build());
+ //get latest-url
+ String url = String.format(commitPattern, hostPort,
committable.getDb());
+ HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
+
+ // http execute...
+ try (CloseableHttpResponse response = httpClient.execute(httpPut))
{
+ StatusLine statusLine = response.getStatusLine();
+ if (200 == statusLine.getStatusCode()) {
+ if (response.getEntity() != null) {
+ String loadResult =
EntityUtils.toString(response.getEntity());
+ Map<String, String> res =
jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
+ });
+ if (res.get("status").equals(FAIL) &&
!ResponseUtil.isCommitted(res.get("msg"))) {
+ throw new DorisRuntimeException("Commit failed " +
loadResult);
+ } else {
+ LOG.info("load result {}", loadResult);
+ }
+ }
+ return;
+ }
+ String reasonPhrase = statusLine.getReasonPhrase();
+ LOG.warn("commit failed with {}, reason {}", hostPort,
reasonPhrase);
+ if (retry == maxRetry) {
+ throw new DorisRuntimeException("stream load error: " +
reasonPhrase);
+ }
+ hostPort = RestService.getBackend(dorisOptions,
dorisReadOptions, LOG);
} catch (IOException e) {
LOG.error("commit transaction failed: ", e);
+ if (retry == maxRetry) {
+ throw new IOException("commit transaction failed: {}", e);
+ }
hostPort = RestService.getBackend(dorisOptions,
dorisReadOptions, LOG);
- continue;
- }
- statusCode = response.getStatusLine().getStatusCode();
- reasonPhrase = response.getStatusLine().getReasonPhrase();
- if (statusCode != 200) {
- LOG.warn("commit failed with {}, reason {}", hostPort,
reasonPhrase);
- hostPort = RestService.getBackend(dorisOptions,
dorisReadOptions, LOG);
- } else {
- break;
- }
- }
-
- if (statusCode != 200) {
- throw new DorisRuntimeException("stream load error: " +
reasonPhrase);
- }
-
- ObjectMapper mapper = new ObjectMapper();
- if (response.getEntity() != null) {
- String loadResult = EntityUtils.toString(response.getEntity());
- Map<String, String> res = mapper.readValue(loadResult, new
TypeReference<HashMap<String, String>>() {
- });
- if (res.get("status").equals(FAIL) &&
!ResponseUtil.isCommitted(res.get("msg"))) {
- throw new DorisRuntimeException("Commit failed " + loadResult);
- } else {
- LOG.info("load result {}", loadResult);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]