This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 43a055a9 [improve] support group commit (#412)
43a055a9 is described below
commit 43a055a9e6b4c728912725976eedbecdfb8b270c
Author: wudi <[email protected]>
AuthorDate: Mon Jul 1 10:31:15 2024 +0800
[improve] support group commit (#412)
---
.../java/org/apache/doris/flink/sink/HttpPutBuilder.java | 4 +++-
.../apache/doris/flink/sink/batch/DorisBatchStreamLoad.java | 6 ++++++
.../org/apache/doris/flink/sink/writer/DorisStreamLoad.java | 12 +++++++++++-
.../org/apache/doris/flink/sink/writer/LoadConstants.java | 1 +
.../apache/doris/flink/sink/copy/TestDorisCopyWriter.java | 3 +++
5 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
index 023cd31a..44f6c9fe 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
@@ -111,7 +111,9 @@ public class HttpPutBuilder {
}
public HttpPutBuilder setLabel(String label) {
- header.put("label", label);
+ if (label != null) {
+ header.put("label", label);
+ }
return this;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index d9fba749..fbc6daa0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -67,6 +67,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
+import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
@@ -95,6 +96,7 @@ public class DorisBatchStreamLoad implements Serializable {
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
private HttpClientBuilder httpClientBuilder = new
HttpUtil().getHttpClientBuilderForBatch();
private BackendUtil backendUtil;
+ private boolean enableGroupCommit;
public DorisBatchStreamLoad(
DorisOptions dorisOptions,
@@ -120,6 +122,7 @@ public class DorisBatchStreamLoad implements Serializable {
LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT))
.getBytes();
}
+ this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT);
this.executionOptions = executionOptions;
this.flushQueue = new
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
@@ -260,6 +263,9 @@ public class DorisBatchStreamLoad implements Serializable {
/** execute stream load. */
public void load(String label, BatchRecordBuffer buffer) throws
IOException {
+ if (enableGroupCommit) {
+ label = null;
+ }
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
ByteBuffer data = buffer.getData();
ByteArrayEntity entity =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 14e44dee..676de3df 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -58,6 +58,7 @@ import static
org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
+import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
@@ -87,6 +88,7 @@ public class DorisStreamLoad implements Serializable {
private final ExecutorService executorService;
private boolean loadBatchFirstRecord;
private volatile String currentLabel;
+ private boolean enableGroupCommit;
public DorisStreamLoad(
String hostPort,
@@ -129,6 +131,7 @@ public class DorisStreamLoad implements Serializable {
LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT))
.getBytes();
}
+ enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT);
loadBatchFirstRecord = true;
}
@@ -276,6 +279,9 @@ public class DorisStreamLoad implements Serializable {
* @throws IOException
*/
public void startLoad(String label, boolean isResume) throws IOException {
+ if (enableGroupCommit) {
+ label = null;
+ }
loadBatchFirstRecord = !isResume;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
@@ -294,10 +300,14 @@ public class DorisStreamLoad implements Serializable {
if (enable2PC) {
putBuilder.enable2PC();
}
+ String finalLabel = label;
pendingLoadFuture =
executorService.submit(
() -> {
- LOG.info("table {} start execute load for
label {}", table, label);
+ LOG.info(
+ "table {} start execute load for label
{}",
+ table,
+ finalLabel);
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index 2e5d29a5..2a79b736 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -31,4 +31,5 @@ public class LoadConstants {
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";
+ public static final String GROUP_COMMIT = "group_commit";
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
index e31e9179..c36805ba 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
@@ -32,7 +32,9 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.FixMethodOrder;
import org.junit.Test;
+import org.junit.runners.MethodSorters;
import java.util.Collection;
import java.util.List;
@@ -41,6 +43,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestDorisCopyWriter {
DorisOptions dorisOptions;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]