This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b42bc6ba [Improve] add check for doris.batch.size (#480)
b42bc6ba is described below

commit b42bc6ba7ed0adfc3fd75f1110da687e25793cb7
Author: wudi <[email protected]>
AuthorDate: Tue Sep 3 18:45:44 2024 +0800

    [Improve] add check for doris.batch.size (#480)
---
 .../main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java  | 3 ++-
 .../serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java     | 6 +++++-
 .../sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java  | 6 +++++-
 .../java/org/apache/doris/flink/source/reader/DorisValueReader.java | 3 ++-
 .../main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java    | 1 +
 .../org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java  | 4 ++--
 6 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index c249c251..3709f0ae 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -42,7 +42,8 @@ public interface ConfigurationOptions {
     Integer DORIS_TABLET_SIZE_MIN = 1;
 
     String DORIS_BATCH_SIZE = "doris.batch.size";
-    Integer DORIS_BATCH_SIZE_DEFAULT = 1024;
+    Integer DORIS_BATCH_SIZE_DEFAULT = 4064;
+    Integer DORIS_BATCH_SIZE_MAX = 65535;
 
     String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
     Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 8589934592L;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 6600dd07..0b9172e8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -123,7 +123,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                     String dorisTbl = getCreateTableIdentifier(recordRoot);
                     changeContext.getTableMapping().put(cdcTbl, dorisTbl);
                     this.tableMapping = changeContext.getTableMapping();
-                    LOG.info("create table ddl status: {}", status);
+                    LOG.info(
+                            "create table ddl status: {}, add tableMapping 
{},{}",
+                            status,
+                            cdcTbl,
+                            dorisTbl);
                 }
             } else if (eventType.equals(EventType.ALTER)) {
                 Tuple2<String, String> dorisTableTuple = 
getDorisTableTuple(recordRoot);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index 5bf245ee..fd10ba53 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -83,7 +83,11 @@ public class SQLParserSchemaChange extends 
JsonDebeziumSchemaChange {
                     String dorisTbl = getCreateTableIdentifier(recordRoot);
                     changeContext.getTableMapping().put(cdcTbl, dorisTbl);
                     this.tableMapping = changeContext.getTableMapping();
-                    LOG.info("create table ddl status: {}", status);
+                    LOG.info(
+                            "create table ddl status: {}, add tableMapping 
{},{}",
+                            status,
+                            cdcTbl,
+                            dorisTbl);
                 }
             } else if (eventType.equals(EventType.ALTER)) {
                 Tuple2<String, String> dorisTableTuple = 
getDorisTableTuple(recordRoot);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 35639e8a..e55e1775 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_MAX;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -130,7 +131,7 @@ public class DorisValueReader extends ValueReader 
implements AutoCloseable {
         Integer batchSize =
                 readOptions.getRequestBatchSize() == null
                         ? DORIS_BATCH_SIZE_DEFAULT
-                        : readOptions.getRequestBatchSize();
+                        : Math.min(readOptions.getRequestBatchSize(), 
DORIS_BATCH_SIZE_MAX);
         Integer queryDorisTimeout =
                 readOptions.getRequestQueryTimeoutS() == null
                         ? DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 5ae44da6..7cd29506 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -158,6 +158,7 @@ public abstract class DatabaseSync {
             System.out.println("Create table finished.");
             System.exit(0);
         }
+        LOG.info("table mapping: {}", tableMapping);
         config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
         DataStreamSource<String> streamSource = buildCdcSource(env);
         if (singleSink) {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 56887d93..6b7ef1f7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -53,7 +53,7 @@ public class DorisDynamicTableFactoryTest {
         Map<String, String> properties = getAllOptions();
         properties.put("doris.request.query.timeout", "21600s");
         properties.put("doris.request.tablet.size", "1");
-        properties.put("doris.batch.size", "1024");
+        properties.put("doris.batch.size", "4064");
         properties.put("doris.exec.mem.limit", "8192mb");
         properties.put("doris.deserialize.arrow.async", "false");
         properties.put("doris.deserialize.queue.size", "64");
@@ -118,7 +118,7 @@ public class DorisDynamicTableFactoryTest {
         Map<String, String> properties = getAllOptions();
         properties.put("doris.request.query.timeout", "21600s");
         properties.put("doris.request.tablet.size", "1");
-        properties.put("doris.batch.size", "1024");
+        properties.put("doris.batch.size", "4064");
         properties.put("doris.exec.mem.limit", "8192mb");
         properties.put("doris.deserialize.arrow.async", "false");
         properties.put("doris.deserialize.queue.size", "64");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to