This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch branch-for-flink-before-1.13
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 
by this push:
     new 786233f  [improvement] (before 1.13)Support set max bytes in each 
batch to avoid congestion (#18)
786233f is described below

commit 786233ff7c21d83554530976b8481d12dce3219f
Author: Jiangqiao Xu <[email protected]>
AuthorDate: Wed Mar 23 11:18:03 2022 +0800

    [improvement] (before 1.13)Support set max bytes in each batch to avoid 
congestion (#18)
    
    * [improvement] Support set max bytes in each batch to avoid congestion
    
    Co-authored-by: xujiangqiao <[email protected]>
---
 .../doris/flink/cfg/DorisExecutionOptions.java       | 20 +++++++++++++++++---
 .../doris/flink/table/DorisDynamicOutputFormat.java  | 16 +++++++++++++---
 .../doris/flink/table/DorisDynamicTableFactory.java  | 10 +++++++++-
 .../apache/doris/flink/DorisSourceSinkExample.java   |  3 ++-
 4 files changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index ad1ab07..6d3a4ea 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -31,10 +31,12 @@ public class DorisExecutionOptions implements Serializable {
     public static final Integer DEFAULT_BATCH_SIZE = 10000;
     public static final Integer DEFAULT_MAX_RETRY_TIMES = 1;
     private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
+    public static final Long DEFAULT_MAX_BATCH_BYTES = 1024 * 1024 * 10L;
 
     private final Integer batchSize;
     private final Integer maxRetries;
     private final Long batchIntervalMs;
+    private final Long maxBatchBytes;
 
     /**
      * Properties for the StreamLoad.
@@ -44,13 +46,15 @@ public class DorisExecutionOptions implements Serializable {
     private final Boolean enableDelete;
 
 
-    public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long 
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete) {
+    public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long 
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete, Long 
maxBatchBytes) {
         Preconditions.checkArgument(maxRetries >= 0);
+        Preconditions.checkArgument(maxBatchBytes >= 0);
         this.batchSize = batchSize;
         this.maxRetries = maxRetries;
         this.batchIntervalMs = batchIntervalMs;
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
+        this.maxBatchBytes = maxBatchBytes;
     }
 
     public static Builder builder() {
@@ -84,6 +88,10 @@ public class DorisExecutionOptions implements Serializable {
         return enableDelete;
     }
 
+    public Long getMaxBatchBytes() {
+        return maxBatchBytes;
+    }
+
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
@@ -93,6 +101,7 @@ public class DorisExecutionOptions implements Serializable {
         private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
         private Properties streamLoadProp = new Properties();
         private Boolean enableDelete = false;
+        private Long maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
 
         public Builder setBatchSize(Integer batchSize) {
             this.batchSize = batchSize;
@@ -119,10 +128,15 @@ public class DorisExecutionOptions implements 
Serializable {
             return this;
         }
 
+        public Builder setMaxBatchBytes(Long maxBatchBytes) {
+            this.maxBatchBytes = maxBatchBytes;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(batchSize, maxRetries, 
batchIntervalMs, streamLoadProp, enableDelete);
+            return new DorisExecutionOptions(batchSize, maxRetries, 
batchIntervalMs, streamLoadProp, enableDelete, maxBatchBytes);
         }
     }
 
 
-}
+}
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 44e0a6a..cd8dee0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -26,15 +26,16 @@ import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -77,6 +78,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
     private final boolean jsonFormat;
     private final RowData.FieldGetter[] fieldGetters;
     private final List batch = new ArrayList<>();
+    private long batchBytes = 0L;
     private String fieldDelimiter;
     private String lineDelimiter;
     private DorisOptions options;
@@ -219,7 +221,8 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
     public synchronized void writeRecord(T row) throws IOException {
         checkFlushException();
         addBatch(row);
-        if (executionOptions.getBatchSize() > 0 && batch.size() >= 
executionOptions.getBatchSize()) {
+        if ((executionOptions.getBatchSize() > 0 && batch.size() >= 
executionOptions.getBatchSize())
+                || batchBytes >= executionOptions.getMaxBatchBytes()) {
             flush();
         }
     }
@@ -234,9 +237,14 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                 if (jsonFormat) {
                     String data = field != null ? field.toString() : null;
                     valueMap.put(this.fieldNames[i], data);
+                    batchBytes += 
this.fieldNames[i].getBytes(StandardCharsets.UTF_8).length;
+                    if (data != null) {
+                        batchBytes += 
data.getBytes(StandardCharsets.UTF_8).length;
+                    }
                 } else {
                     String data = field != null ? field.toString() : 
NULL_VALUE;
                     value.add(data);
+                    batchBytes += data.getBytes(StandardCharsets.UTF_8).length;
                 }
             }
             // add doris delete sign
@@ -250,6 +258,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
             Object data = jsonFormat ? valueMap : value.toString();
             batch.add(data);
         } else if (row instanceof String) {
+            batchBytes += ((String) 
row).getBytes(StandardCharsets.UTF_8).length;
             batch.add(row);
         } else {
             throw new RuntimeException("The type of element should be 
'RowData' or 'String' only.");
@@ -308,6 +317,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
             try {
                 dorisStreamLoad.load(result);
                 batch.clear();
+                batchBytes = 0;
                 break;
             } catch (StreamLoadException e) {
                 LOG.error("doris sink error, retry times = {}", i, e);
@@ -402,4 +412,4 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
 
 
     }
-}
+}
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index dbba859..7033dbd 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -141,6 +141,12 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
             .booleanType()
             .defaultValue(true)
             .withDescription("whether to enable the delete function");
+    private static final ConfigOption<Long> SINK_BUFFER_FLUSH_MAX_BYTES = 
ConfigOptions
+            .key("sink.batch.bytes")
+            .longType()
+            .defaultValue(DorisExecutionOptions.DEFAULT_MAX_BATCH_BYTES)
+            .withDescription("the flush max bytes (includes all append, upsert 
and delete records), over this number" +
+                    " in batch, will flush data. The default value is 10MB.");
 
     @Override
     public String factoryIdentifier() {
@@ -179,6 +185,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_MAX_RETRIES);
         options.add(SINK_BUFFER_FLUSH_INTERVAL);
         options.add(SINK_ENABLE_DELETE);
+        options.add(SINK_BUFFER_FLUSH_MAX_BYTES);
         return options;
     }
 
@@ -235,6 +242,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
         builder.setStreamLoadProp(streamLoadProp);
         builder.setEnableDelete(readableConfig.get(SINK_ENABLE_DELETE));
+        
builder.setMaxBatchBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
         return builder.build();
     }
 
@@ -267,4 +275,4 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
                 physicalSchema
         );
     }
-}
+}
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index c4ce1a5..3b19c22 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -56,9 +56,10 @@ public class DorisSourceSinkExample {
                         "  'username' = 'root',\n" +
                         "  'password' = '',\n" +
                         "  'sink.batch.size' = '3',\n" +
+                        "  'sink.batch.bytes' = '1',\n" +
                         "  'sink.max-retries' = '2'\n" +
                         ")");
 
         tEnv.executeSql("INSERT INTO doris_test_sink select 
name,age,price,sale from doris_test");
     }
-}
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to