This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new acf6e45 reduce streamload connection (#133)
acf6e45 is described below
commit acf6e4582d8e651eff6174d05aa56f7681434645
Author: wudi <[email protected]>
AuthorDate: Wed Apr 26 13:51:38 2023 +0800
reduce streamload connection (#133)
Co-authored-by: wudi <>
---
.../org/apache/doris/flink/sink/writer/DorisWriter.java | 13 +++++++++----
.../org/apache/doris/flink/sink/writer/LabelGenerator.java | 4 +++-
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index ac8fdaf..8ba2050 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -123,7 +123,6 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
// get main work thread.
executorThread = Thread.currentThread();
this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1);
- dorisStreamLoad.startLoad(currentLabel);
// when uploading data in streaming mode, we need to regularly detect
whether there are exceptions.
scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200,
intervalTime, TimeUnit.MILLISECONDS);
}
@@ -131,6 +130,11 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
+ if(!loading) {
+ //Start streamload only when there has data
+ dorisStreamLoad.startLoad(currentLabel);
+ loading = true;
+ }
byte[] serialize = serializer.serialize(in);
if(Objects.isNull(serialize)){
return;
@@ -140,6 +144,10 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
@Override
public List<DorisCommittable> prepareCommit(boolean flush) throws
IOException {
+ if(!loading){
+ //There is no data during the entire checkpoint period
+ return Collections.emptyList();
+ }
// disable exception checker before stop load.
loading = false;
Preconditions.checkState(dorisStreamLoad != null);
@@ -152,7 +160,6 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
return Collections.emptyList();
}
long txnId = respContent.getTxnId();
-
return ImmutableList.of(new
DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(),
txnId));
}
@@ -162,8 +169,6 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
// dynamic refresh BE node
this.dorisStreamLoad.setHostPort(getAvailableBackend());
this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
- this.dorisStreamLoad.startLoad(currentLabel);
- this.loading = true;
return Collections.singletonList(dorisWriterState);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 436d709..d31e777 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -16,6 +16,8 @@
// under the License.
package org.apache.doris.flink.sink.writer;
+import java.util.UUID;
+
/**
* Generator label for stream load.
*/
@@ -29,6 +31,6 @@ public class LabelGenerator {
}
public String generateLabel(long chkId) {
- return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" +
System.currentTimeMillis();
+ return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" +
UUID.randomUUID();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]