This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch branch-for-flink-before-1.13
in repository
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13
by this push:
new 786233f [improvement] (before 1.13)Support set max bytes in each
batch to avoid congestion (#18)
786233f is described below
commit 786233ff7c21d83554530976b8481d12dce3219f
Author: Jiangqiao Xu <[email protected]>
AuthorDate: Wed Mar 23 11:18:03 2022 +0800
[improvement] (before 1.13)Support set max bytes in each batch to avoid
congestion (#18)
* [improvement] Support set max bytes in each batch to avoid congestion
Co-authored-by: xujiangqiao <[email protected]>
---
.../doris/flink/cfg/DorisExecutionOptions.java | 20 +++++++++++++++++---
.../doris/flink/table/DorisDynamicOutputFormat.java | 16 +++++++++++++---
.../doris/flink/table/DorisDynamicTableFactory.java | 10 +++++++++-
.../apache/doris/flink/DorisSourceSinkExample.java | 3 ++-
4 files changed, 41 insertions(+), 8 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index ad1ab07..6d3a4ea 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -31,10 +31,12 @@ public class DorisExecutionOptions implements Serializable {
public static final Integer DEFAULT_BATCH_SIZE = 10000;
public static final Integer DEFAULT_MAX_RETRY_TIMES = 1;
private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
+ public static final Long DEFAULT_MAX_BATCH_BYTES = 1024 * 1024 * 10L;
private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;
+ private final Long maxBatchBytes;
/**
* Properties for the StreamLoad.
@@ -44,13 +46,15 @@ public class DorisExecutionOptions implements Serializable {
private final Boolean enableDelete;
- public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete) {
+ public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete, Long
maxBatchBytes) {
Preconditions.checkArgument(maxRetries >= 0);
+ Preconditions.checkArgument(maxBatchBytes >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
+ this.maxBatchBytes = maxBatchBytes;
}
public static Builder builder() {
@@ -84,6 +88,10 @@ public class DorisExecutionOptions implements Serializable {
return enableDelete;
}
+ public Long getMaxBatchBytes() {
+ return maxBatchBytes;
+ }
+
/**
* Builder of {@link DorisExecutionOptions}.
*/
@@ -93,6 +101,7 @@ public class DorisExecutionOptions implements Serializable {
private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
private Properties streamLoadProp = new Properties();
private Boolean enableDelete = false;
+ private Long maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@@ -119,10 +128,15 @@ public class DorisExecutionOptions implements
Serializable {
return this;
}
+ public Builder setMaxBatchBytes(Long maxBatchBytes) {
+ this.maxBatchBytes = maxBatchBytes;
+ return this;
+ }
+
public DorisExecutionOptions build() {
- return new DorisExecutionOptions(batchSize, maxRetries,
batchIntervalMs, streamLoadProp, enableDelete);
+ return new DorisExecutionOptions(batchSize, maxRetries,
batchIntervalMs, streamLoadProp, enableDelete, maxBatchBytes);
}
}
-}
+}
\ No newline at end of file
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 44e0a6a..cd8dee0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -26,15 +26,16 @@ import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -77,6 +78,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
private final boolean jsonFormat;
private final RowData.FieldGetter[] fieldGetters;
private final List batch = new ArrayList<>();
+ private long batchBytes = 0L;
private String fieldDelimiter;
private String lineDelimiter;
private DorisOptions options;
@@ -219,7 +221,8 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
public synchronized void writeRecord(T row) throws IOException {
checkFlushException();
addBatch(row);
- if (executionOptions.getBatchSize() > 0 && batch.size() >=
executionOptions.getBatchSize()) {
+ if ((executionOptions.getBatchSize() > 0 && batch.size() >=
executionOptions.getBatchSize())
+ || batchBytes >= executionOptions.getMaxBatchBytes()) {
flush();
}
}
@@ -234,9 +237,14 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
if (jsonFormat) {
String data = field != null ? field.toString() : null;
valueMap.put(this.fieldNames[i], data);
+ batchBytes +=
this.fieldNames[i].getBytes(StandardCharsets.UTF_8).length;
+ if (data != null) {
+ batchBytes +=
data.getBytes(StandardCharsets.UTF_8).length;
+ }
} else {
String data = field != null ? field.toString() :
NULL_VALUE;
value.add(data);
+ batchBytes += data.getBytes(StandardCharsets.UTF_8).length;
}
}
// add doris delete sign
@@ -250,6 +258,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
Object data = jsonFormat ? valueMap : value.toString();
batch.add(data);
} else if (row instanceof String) {
+ batchBytes += ((String)
row).getBytes(StandardCharsets.UTF_8).length;
batch.add(row);
} else {
throw new RuntimeException("The type of element should be
'RowData' or 'String' only.");
@@ -308,6 +317,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
try {
dorisStreamLoad.load(result);
batch.clear();
+ batchBytes = 0;
break;
} catch (StreamLoadException e) {
LOG.error("doris sink error, retry times = {}", i, e);
@@ -402,4 +412,4 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
}
-}
+}
\ No newline at end of file
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index dbba859..7033dbd 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -141,6 +141,12 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
.booleanType()
.defaultValue(true)
.withDescription("whether to enable the delete function");
+ private static final ConfigOption<Long> SINK_BUFFER_FLUSH_MAX_BYTES =
ConfigOptions
+ .key("sink.batch.bytes")
+ .longType()
+ .defaultValue(DorisExecutionOptions.DEFAULT_MAX_BATCH_BYTES)
+ .withDescription("the flush max bytes (includes all append, upsert
and delete records), over this number" +
+ " in batch, will flush data. The default value is 10MB.");
@Override
public String factoryIdentifier() {
@@ -179,6 +185,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(SINK_MAX_RETRIES);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
options.add(SINK_ENABLE_DELETE);
+ options.add(SINK_BUFFER_FLUSH_MAX_BYTES);
return options;
}
@@ -235,6 +242,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setStreamLoadProp(streamLoadProp);
builder.setEnableDelete(readableConfig.get(SINK_ENABLE_DELETE));
+
builder.setMaxBatchBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
return builder.build();
}
@@ -267,4 +275,4 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
physicalSchema
);
}
-}
+}
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index c4ce1a5..3b19c22 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -56,9 +56,10 @@ public class DorisSourceSinkExample {
" 'username' = 'root',\n" +
" 'password' = '',\n" +
" 'sink.batch.size' = '3',\n" +
+ " 'sink.batch.bytes' = '1',\n" +
" 'sink.max-retries' = '2'\n" +
")");
tEnv.executeSql("INSERT INTO doris_test_sink select
name,age,price,sale from doris_test");
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]