This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b42bc6ba [Improve] add check for doris.batch.size (#480)
b42bc6ba is described below
commit b42bc6ba7ed0adfc3fd75f1110da687e25793cb7
Author: wudi <[email protected]>
AuthorDate: Tue Sep 3 18:45:44 2024 +0800
[Improve] add check for doris.batch.size (#480)
---
.../main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java | 3 ++-
.../serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java | 6 +++++-
.../sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java | 6 +++++-
.../java/org/apache/doris/flink/source/reader/DorisValueReader.java | 3 ++-
.../main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 1 +
.../org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java | 4 ++--
6 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index c249c251..3709f0ae 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -42,7 +42,8 @@ public interface ConfigurationOptions {
Integer DORIS_TABLET_SIZE_MIN = 1;
String DORIS_BATCH_SIZE = "doris.batch.size";
- Integer DORIS_BATCH_SIZE_DEFAULT = 1024;
+ Integer DORIS_BATCH_SIZE_DEFAULT = 4064;
+ Integer DORIS_BATCH_SIZE_MAX = 65535;
String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 8589934592L;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 6600dd07..0b9172e8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -123,7 +123,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
- LOG.info("create table ddl status: {}", status);
+ LOG.info(
+ "create table ddl status: {}, add tableMapping
{},{}",
+ status,
+ cdcTbl,
+ dorisTbl);
}
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple =
getDorisTableTuple(recordRoot);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index 5bf245ee..fd10ba53 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -83,7 +83,11 @@ public class SQLParserSchemaChange extends
JsonDebeziumSchemaChange {
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
- LOG.info("create table ddl status: {}", status);
+ LOG.info(
+ "create table ddl status: {}, add tableMapping
{},{}",
+ status,
+ cdcTbl,
+ dorisTbl);
}
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple =
getDorisTableTuple(recordRoot);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 35639e8a..e55e1775 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_MAX;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -130,7 +131,7 @@ public class DorisValueReader extends ValueReader
implements AutoCloseable {
Integer batchSize =
readOptions.getRequestBatchSize() == null
? DORIS_BATCH_SIZE_DEFAULT
- : readOptions.getRequestBatchSize();
+ : Math.min(readOptions.getRequestBatchSize(),
DORIS_BATCH_SIZE_MAX);
Integer queryDorisTimeout =
readOptions.getRequestQueryTimeoutS() == null
? DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 5ae44da6..7cd29506 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -158,6 +158,7 @@ public abstract class DatabaseSync {
System.out.println("Create table finished.");
System.exit(0);
}
+ LOG.info("table mapping: {}", tableMapping);
config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
DataStreamSource<String> streamSource = buildCdcSource(env);
if (singleSink) {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 56887d93..6b7ef1f7 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -53,7 +53,7 @@ public class DorisDynamicTableFactoryTest {
Map<String, String> properties = getAllOptions();
properties.put("doris.request.query.timeout", "21600s");
properties.put("doris.request.tablet.size", "1");
- properties.put("doris.batch.size", "1024");
+ properties.put("doris.batch.size", "4064");
properties.put("doris.exec.mem.limit", "8192mb");
properties.put("doris.deserialize.arrow.async", "false");
properties.put("doris.deserialize.queue.size", "64");
@@ -118,7 +118,7 @@ public class DorisDynamicTableFactoryTest {
Map<String, String> properties = getAllOptions();
properties.put("doris.request.query.timeout", "21600s");
properties.put("doris.request.tablet.size", "1");
- properties.put("doris.batch.size", "1024");
+ properties.put("doris.batch.size", "4064");
properties.put("doris.exec.mem.limit", "8192mb");
properties.put("doris.deserialize.arrow.async", "false");
properties.put("doris.deserialize.queue.size", "64");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]