This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a9d5e25 [Enhancemant] add label prefix configuration item for doris
sink to track writing (#235)
a9d5e25 is described below
commit a9d5e253037c6259aa44c56526072b73ef3859b1
Author: LeiWang <[email protected]>
AuthorDate: Thu Oct 24 17:59:04 2024 +0800
[Enhancemant] add label prefix configuration item for doris sink to track
writing (#235)
---
.../main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java | 3 +++
.../src/main/scala/org/apache/doris/spark/load/StreamLoader.scala | 7 +++++--
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index cf0630f..3d60142 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -126,6 +126,9 @@ public interface ConfigurationOptions {
String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect";
boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = true;
+ String DORIS_SINK_LABEL_PREFIX = "doris.sink.label.prefix";
+ String DORIS_SINK_LABEL_PREFIX_DEFAULT = "spark_streamload";
+
/**
* compress_type
*/
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 5f7765d..52a88bc 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -83,6 +83,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT) && autoRedirect
private val enableGroupCommit: Boolean =
streamLoadProps.contains(ConfigurationOptions.GROUP_COMMIT)
+
/**
* execute stream load
*
@@ -384,7 +385,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
/**
* generate load label
*
- * spark_streamload_YYYYMMDD_HHMMSS_{UUID}
+ * {label_prefix}_YYYYMMDD_HHMMSS_{UUID}
*
* @return load label
*/
@@ -393,7 +394,9 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
return null;
}
val calendar = Calendar.getInstance
- "spark_streamload_" +
+ val labelPrefix =
settings.getProperty(ConfigurationOptions.DORIS_SINK_LABEL_PREFIX,
+ ConfigurationOptions.DORIS_SINK_LABEL_PREFIX_DEFAULT)
+ labelPrefix + "_" +
f"${calendar.get(Calendar.YEAR)}${calendar.get(Calendar.MONTH) +
1}%02d${calendar.get(Calendar.DAY_OF_MONTH)}%02d" +
f"_${calendar.get(Calendar.HOUR_OF_DAY)}%02d${calendar.get(Calendar.MINUTE)}%02d${calendar.get(Calendar.SECOND)}%02d"
+
f"_${UUID.randomUUID.toString.replaceAll("-", "")}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]