This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13
by this push:
new 73647755 [Improve] add sink.lable-prefix for 1.13 and add ignore
properties (#647)
73647755 is described below
commit 736477558708fb5f8338f6dd043733d544e92912
Author: wudi <[email protected]>
AuthorDate: Tue May 19 14:30:08 2026 +0800
[Improve] add sink.lable-prefix for 1.13 and add ignore properties (#647)
---
.../apache/doris/flink/cfg/DorisExecutionOptions.java | 16 ++++++++++++++--
.../org/apache/doris/flink/rest/models/BackendV2.java | 2 +-
.../java/org/apache/doris/flink/rest/models/Field.java | 3 +++
.../org/apache/doris/flink/rest/models/QueryPlan.java | 3 +++
.../java/org/apache/doris/flink/rest/models/Schema.java | 3 +++
.../java/org/apache/doris/flink/rest/models/Tablet.java | 5 ++++-
.../doris/flink/table/DorisDynamicOutputFormat.java | 3 ++-
.../doris/flink/table/DorisDynamicTableFactory.java | 7 +++++++
.../org/apache/doris/flink/table/DorisStreamLoad.java | 11 +++++------
9 files changed, 42 insertions(+), 11 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 6d3a4eaa..6e192a40 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -45,8 +45,9 @@ public class DorisExecutionOptions implements Serializable {
private final Boolean enableDelete;
+ private final String labelPrefix;
- public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete, Long
maxBatchBytes) {
+ public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete, Long
maxBatchBytes, String labelPrefix) {
Preconditions.checkArgument(maxRetries >= 0);
Preconditions.checkArgument(maxBatchBytes >= 0);
this.batchSize = batchSize;
@@ -55,6 +56,7 @@ public class DorisExecutionOptions implements Serializable {
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
this.maxBatchBytes = maxBatchBytes;
+ this.labelPrefix = labelPrefix;
}
public static Builder builder() {
@@ -92,6 +94,10 @@ public class DorisExecutionOptions implements Serializable {
return maxBatchBytes;
}
+ public String getLabelPrefix() {
+ return labelPrefix;
+ }
+
/**
* Builder of {@link DorisExecutionOptions}.
*/
@@ -102,6 +108,7 @@ public class DorisExecutionOptions implements Serializable {
private Properties streamLoadProp = new Properties();
private Boolean enableDelete = false;
private Long maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
+ private String labelPrefix;
public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@@ -133,8 +140,13 @@ public class DorisExecutionOptions implements Serializable
{
return this;
}
+ public Builder setLabelPrefix(String labelPrefix) {
+ this.labelPrefix = labelPrefix;
+ return this;
+ }
+
public DorisExecutionOptions build() {
- return new DorisExecutionOptions(batchSize, maxRetries,
batchIntervalMs, streamLoadProp, enableDelete, maxBatchBytes);
+ return new DorisExecutionOptions(batchSize, maxRetries,
batchIntervalMs, streamLoadProp, enableDelete, maxBatchBytes, labelPrefix);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index a6b5cf3f..d7f0116d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -19,7 +19,6 @@ package org.apache.doris.flink.rest.models;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import java.util.List;
/**
@@ -39,6 +38,7 @@ public class BackendV2 {
this.backends = backends;
}
+ @JsonIgnoreProperties(ignoreUnknown = true)
public static class BackendRowV2 {
@JsonProperty("ip")
public String ip;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
index 04341bf5..b45bf118 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
@@ -17,8 +17,11 @@
package org.apache.doris.flink.rest.models;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
import java.util.Objects;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class Field {
private String name;
private String type;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
index e65175ca..5b2ace1e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
@@ -17,9 +17,12 @@
package org.apache.doris.flink.rest.models;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
import java.util.Map;
import java.util.Objects;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class QueryPlan {
private int status;
private String opaqued_query_plan;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
index 264e7368..cfd015c1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
@@ -17,10 +17,13 @@
package org.apache.doris.flink.rest.models;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class Schema {
private int status = 0;
private String keysType;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
index 70b0f139..78ed72f6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
@@ -17,10 +17,13 @@
package org.apache.doris.flink.rest.models;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
import java.util.List;
import java.util.Objects;
-public class Tablet {
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Tablet {
private List<String> routings;
private int version;
private long versionHash;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 993dc651..d0bf8311 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -207,7 +207,8 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp(),
- readOptions);
+ readOptions,
+ executionOptions.getLabelPrefix());
if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new
ExecutorThreadFactory("doris-streamload-output" +
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 7033dbd9..d11c2217 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -147,6 +147,11 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
.defaultValue(DorisExecutionOptions.DEFAULT_MAX_BATCH_BYTES)
.withDescription("the flush max bytes (includes all append, upsert
and delete records), over this number" +
" in batch, will flush data. The default value is 10MB.");
+ private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
+ .key("sink.label-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the label prefix for stream load, the default
value is flink_connector");
@Override
public String factoryIdentifier() {
@@ -186,6 +191,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(SINK_BUFFER_FLUSH_INTERVAL);
options.add(SINK_ENABLE_DELETE);
options.add(SINK_BUFFER_FLUSH_MAX_BYTES);
+ options.add(SINK_LABEL_PREFIX);
return options;
}
@@ -243,6 +249,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
builder.setStreamLoadProp(streamLoadProp);
builder.setEnableDelete(readableConfig.get(SINK_ENABLE_DELETE));
builder.setMaxBatchBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
+
readableConfig.getOptional(SINK_LABEL_PREFIX).ifPresent(builder::setLabelPrefix);
return builder.build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index 72cf8ffc..b22d2fba 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -40,10 +40,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -68,9 +66,11 @@ public class DorisStreamLoad implements Serializable {
private String tbl;
private String authEncoding;
private Properties streamLoadProp;
+ private static final String LABEL_PREFIX_DEFAULT = "flink_connector";
+ private String labelPrefix;
private final HttpClientBuilder httpClientBuilder;
- public DorisStreamLoad(String hostPort, String db, String tbl, String
user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions) {
+ public DorisStreamLoad(String hostPort, String db, String tbl, String
user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions,
String labelPrefix) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
@@ -79,6 +79,7 @@ public class DorisStreamLoad implements Serializable {
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = basicAuthHeader(user, passwd);
this.streamLoadProp = streamLoadProp;
+ this.labelPrefix = StringUtils.isBlank(labelPrefix) ?
LABEL_PREFIX_DEFAULT : labelPrefix;
int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null
? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT :
readOptions.getRequestConnectTimeoutMs();
int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ?
ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT :
readOptions.getRequestReadTimeoutMs();
this.httpClientBuilder = HttpClients
@@ -127,9 +128,7 @@ public class DorisStreamLoad implements Serializable {
private LoadResponse loadBatch(String value) {
String label = streamLoadProp.getProperty("label");
if (StringUtils.isBlank(label)) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss");
- String formatDate = sdf.format(new Date());
- label = String.format("flink_connector_%s_%s", formatDate,
+ label = String.format("%s_%s", labelPrefix,
UUID.randomUUID().toString().replaceAll("-", ""));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]