This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e2e7165 [improve] change batch mode param and add default streamload
prop (#251)
e2e7165 is described below
commit e2e71651e1c154cc75c5fd7bbca648da9ad4a813
Author: wudi <[email protected]>
AuthorDate: Thu Nov 30 17:41:14 2023 +0800
[improve] change batch mode param and add default streamload prop (#251)
Co-authored-by: wudi <>
---
.../org/apache/doris/flink/cfg/DorisExecutionOptions.java | 11 +++++++++--
.../apache/doris/flink/table/DorisDynamicTableFactory.java | 5 +----
.../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +----
3 files changed, 11 insertions(+), 10 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 9a43d77..25e1535 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -119,6 +119,13 @@ public class DorisExecutionOptions implements Serializable
{
return new Builder().setStreamLoadProp(properties).build();
}
+ public static Properties defaultsProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("format", "json");
+ properties.setProperty("read_json_by_line", "true");
+ return properties;
+ }
+
public Integer checkInterval() {
return checkInterval;
}
@@ -269,8 +276,8 @@ public class DorisExecutionOptions implements Serializable {
return this;
}
- public Builder enableBatchMode() {
- this.enableBatchMode = true;
+ public Builder setBatchMode(Boolean enableBatchMode) {
+ this.enableBatchMode = enableBatchMode;
return this;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index bccb8b7..521d741 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -224,10 +224,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
builder.enable2PC();
}
- if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
- builder.enableBatchMode();
- }
-
+ builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 4dfa6d5..bf26214 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -221,10 +221,7 @@ public abstract class DatabaseSync {
executionBuilder.enable2PC();
}
- //batch option
- if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){
- executionBuilder.enableBatchMode();
- }
+
sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE).ifPresent(executionBuilder::setBatchMode);
sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]