This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e2e7165  [improve] change batch mode param and add default streamload 
prop  (#251)
e2e7165 is described below

commit e2e71651e1c154cc75c5fd7bbca648da9ad4a813
Author: wudi <[email protected]>
AuthorDate: Thu Nov 30 17:41:14 2023 +0800

    [improve] change batch mode param and add default streamload prop  (#251)
    
    Co-authored-by: wudi <>
---
 .../org/apache/doris/flink/cfg/DorisExecutionOptions.java     | 11 +++++++++--
 .../apache/doris/flink/table/DorisDynamicTableFactory.java    |  5 +----
 .../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java   |  5 +----
 3 files changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 9a43d77..25e1535 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -119,6 +119,13 @@ public class DorisExecutionOptions implements Serializable 
{
         return new Builder().setStreamLoadProp(properties).build();
     }
 
+    public static Properties defaultsProperties() {
+        Properties properties = new Properties();
+        properties.setProperty("format", "json");
+        properties.setProperty("read_json_by_line", "true");
+        return properties;
+    }
+
     public Integer checkInterval() {
         return checkInterval;
     }
@@ -269,8 +276,8 @@ public class DorisExecutionOptions implements Serializable {
             return this;
         }
 
-        public Builder enableBatchMode() {
-            this.enableBatchMode = true;
+        public Builder setBatchMode(Boolean enableBatchMode) {
+            this.enableBatchMode = enableBatchMode;
             return this;
         }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index bccb8b7..521d741 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -224,10 +224,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
             builder.enable2PC();
         }
 
-        if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
-            builder.enableBatchMode();
-        }
-
+        builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
         builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
         
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
         
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 4dfa6d5..bf26214 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -221,10 +221,7 @@ public abstract class DatabaseSync {
             executionBuilder.enable2PC();
         }
 
-        //batch option
-        if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){
-            executionBuilder.enableBatchMode();
-        }
+        
sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE).ifPresent(executionBuilder::setBatchMode);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to