This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 80587e7  [improvement](spark-connector)(flink-connector) Modify the 
max num of batch written by Spark/Flink connector each time. (#7485)
80587e7 is described below

commit 80587e7ac24c1184dab3f7b66eda11726f410a71
Author: jiafeng.zhang <[email protected]>
AuthorDate: Sun Dec 26 11:13:47 2021 +0800

    [improvement](spark-connector)(flink-connector) Modify the max num of batch 
written by Spark/Flink connector each time. (#7485)
    
    Increase the default batch size and flush interval
---
 docs/en/extending-doris/flink-doris-connector.md                    | 6 +++---
 docs/en/extending-doris/spark-doris-connector.md                    | 4 ++--
 docs/zh-CN/extending-doris/flink-doris-connector.md                 | 6 +++---
 docs/zh-CN/extending-doris/spark-doris-connector.md                 | 4 ++--
 .../main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java | 4 ++--
 .../main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java  | 6 ++----
 .../main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 2 +-
 7 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/docs/en/extending-doris/flink-doris-connector.md 
b/docs/en/extending-doris/flink-doris-connector.md
index 4a18955..91744ac 100644
--- a/docs/en/extending-doris/flink-doris-connector.md
+++ b/docs/en/extending-doris/flink-doris-connector.md
@@ -294,9 +294,9 @@ outputFormat.close();
 | doris.deserialize.queue.size     | 64                | Asynchronous 
conversion of the internal processing queue in Arrow format takes effect when 
doris.deserialize.arrow.async is true        |
 | doris.read.field            | --            | List of column names in the 
Doris table, separated by commas                  |
 | doris.filter.query          | --            | Filter expression of the 
query, which is transparently transmitted to Doris. Doris uses this expression 
to complete source-side data filtering. |
-| sink.batch.size                        | 100            | Maximum number of 
lines in a single write BE                                             |
-| sink.max-retries                        | 1            | Number of retries 
after writing BE failed                                              |
-| sink.batch.interval                         | 1s            | The flush 
interval, after which the asynchronous thread will write the data in the cache 
to BE. The default value is 1 second, and the time units are ms, s, min, h, and 
d. Set to 0 to turn off periodic writing. |
+| sink.batch.size                        | 10000          | Maximum number of 
lines in a single write BE                                             |
+| sink.max-retries                        | 1          | Number of retries 
after writing BE failed                                              |
+| sink.batch.interval                         | 10s           | The flush 
interval, after which the asynchronous thread will write the data in the cache 
to BE. The default value is 10 second, and the time units are ms, s, min, h, 
and d. Set to 0 to turn off periodic writing. |
 | sink.properties.*     | --               | The stream load parameters.<br /> 
<br /> eg:<br /> sink.properties.column_separator' = ','<br /> <br />  Setting 
'sink.properties.escape_delimiters' = 'true' if you want to use a control char 
as a separator, so that such as '\\x01' will translate to binary 0x01<br /><br 
/>  Support JSON format import, you need to enable both 
'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' 
='true'|
 
 
diff --git a/docs/en/extending-doris/spark-doris-connector.md 
b/docs/en/extending-doris/spark-doris-connector.md
index 57a412d..c875882 100644
--- a/docs/en/extending-doris/spark-doris-connector.md
+++ b/docs/en/extending-doris/spark-doris-connector.md
@@ -182,8 +182,8 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value 
as STRING)")
 | doris.deserialize.arrow.async    | false             | Whether to support 
asynchronous conversion of Arrow format to RowBatch required for 
spark-doris-connector iteration                 |
 | doris.deserialize.queue.size     | 64                | Asynchronous 
conversion of the internal processing queue in Arrow format takes effect when 
doris.deserialize.arrow.async is true        |
 | doris.write.fields                | --                 | Specifies the 
fields (or the order of the fields) to write to the Doris table, fileds 
separated by commas.<br/>By default, all fields are written in the order of 
Doris table fields. |
-| sink.batch.size | 1024 | Maximum number of lines in a single write BE |
-| sink.max-retries | 3 | Number of retries after writing BE failed |
+| sink.batch.size | 10000 | Maximum number of lines in a single write BE |
+| sink.max-retries | 1 | Number of retries after writing BE failed |
 
 ### SQL & Dataframe Configuration
 
diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md 
b/docs/zh-CN/extending-doris/flink-doris-connector.md
index 96d73c5..d5a2123 100644
--- a/docs/zh-CN/extending-doris/flink-doris-connector.md
+++ b/docs/zh-CN/extending-doris/flink-doris-connector.md
@@ -296,9 +296,9 @@ outputFormat.close();
 | doris.deserialize.queue.size     | 64                | 
异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效        |
 | doris.read.field            | --            | 读取Doris表的列名列表,多列之间使用逗号分隔       
           |
 | doris.filter.query          | --            | 
过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 |
-| sink.batch.size     | 100                | 单次写BE的最大行数        |
-| sink.max-retries     | 1                | 写BE失败之后的重试次数       |
-| sink.batch.interval     | 1s                | flush 间隔时间,超过该时间后异步线程将 
缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
+| sink.batch.size     | 10000              | 单次写BE的最大行数        |
+| sink.max-retries     | 1              | 写BE失败之后的重试次数       |
+| sink.batch.interval     | 10s               | flush 间隔时间,超过该时间后异步线程将 
缓存中数据写入BE。 默认值为10秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。 |
 | sink.properties.*     | --               | Stream load 的导入参数<br /><br 
/>例如:<br />'sink.properties.column_separator' = ', '<br />定义列分隔符<br /><br 
/>'sink.properties.escape_delimiters' = 'true'<br 
/>特殊字符作为分隔符,'\\x01'会被转换为二进制的0x01<br /><br /> 'sink.properties.format' = 
'json'<br />'sink.properties.strip_outer_array' = 'true' <br />JSON格式导入|
 
 ## Doris 和 Flink 列类型映射关系
diff --git a/docs/zh-CN/extending-doris/spark-doris-connector.md 
b/docs/zh-CN/extending-doris/spark-doris-connector.md
index 0c1dd93..cdb6ed8 100644
--- a/docs/zh-CN/extending-doris/spark-doris-connector.md
+++ b/docs/zh-CN/extending-doris/spark-doris-connector.md
@@ -186,8 +186,8 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value 
as STRING)")
 | doris.deserialize.arrow.async    | false             | 
是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch                 |
 | doris.deserialize.queue.size     | 64                | 
异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效        |
 | doris.write.fields               | --                 | 
指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照Doris表字段顺序写入全部字段。 |
-| sink.batch.size | 1024 | 单次写BE的最大行数 |
-| sink.max-retries | 3 | 写BE失败之后的重试次数 |
+| sink.batch.size | 10000 | 单次写BE的最大行数 |
+| sink.max-retries | 1 | 写BE失败之后的重试次数 |
 
 ### SQL 和 Dataframe 专有配置
 
diff --git 
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 587ab07..47bb517 100644
--- 
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -28,8 +28,8 @@ import java.util.Properties;
 public class DorisExecutionOptions implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    public static final Integer DEFAULT_BATCH_SIZE = 1000;
-    public static final Integer DEFAULT_MAX_RETRY_TIMES = 3;
+    public static final Integer DEFAULT_BATCH_SIZE = 10000;
+    public static final Integer DEFAULT_MAX_RETRY_TIMES = 1;
     private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
 
     private final Integer batchSize;
diff --git 
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 7ba46b2..9a0cead 100644
--- 
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -66,11 +66,9 @@ public interface ConfigurationOptions {
 
     String DORIS_WRITE_FIELDS = "doris.write.fields";
 
-    String SINK_BATCH_SIZE = "sink.batch.size";
     String DORIS_SINK_BATCH_SIZE = "doris.sink.batch.size";
-    int SINK_BATCH_SIZE_DEFAULT = 1024;
+    int SINK_BATCH_SIZE_DEFAULT = 10000;
 
-    String SINK_MAX_RETRIES = "sink.max-retries";
     String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries";
-    int SINK_MAX_RETRIES_DEFAULT = 3;
+    int SINK_MAX_RETRIES_DEFAULT = 1;
 }
diff --git 
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
 
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index b53a23a..edd08f1 100644
--- 
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ 
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -72,7 +72,7 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
         val loop = new Breaks
         loop.breakable {
 
-          for (i <- 1 to maxRetryTimes) {
+          for (i <- 0 to maxRetryTimes) {
             try {
               dorisStreamLoader.load(rowsBuffer)
               rowsBuffer.clear()

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to