This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 6eace70 [improvement](spark-connector)Support each batch interval
time paramter setting. (#68)
6eace70 is described below
commit 6eace700ce84997d8927f1218416f30c3267cde3
Author: Hong Liu <[email protected]>
AuthorDate: Wed Feb 8 12:55:16 2023 +0800
[improvement](spark-connector)Support each batch interval time paramter
setting. (#68)
* [improvement](spark-connector)Support each batch interval time parameter
setting
* fix default value of doris.sink.batch.intervar.ms
---------
Co-authored-by: smallhibiscus <844981280>
---
.../main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java | 5 +++++
.../main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala | 3 +++
.../main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 3 +++
3 files changed, 11 insertions(+)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 5ef9f19..a3f4061 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -85,4 +85,9 @@ public interface ConfigurationOptions {
String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition";
boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false;
+
+ String DORIS_SINK_BATCH_INTERVAL_MS = "doris.sink.batch.interval.ms";
+
+ int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
+
}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 0399acf..671c4b8 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -65,9 +65,11 @@ private[sql] class DorisSourceProvider extends
DataSourceRegister
val maxRetryTimes =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
val sinkTaskPartitionSize =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
val sinkTaskUseRepartition =
sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+ val batchInterValMs =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
logger.info(s"maxRowCount ${maxRowCount}")
logger.info(s"maxRetryTimes ${maxRetryTimes}")
+ logger.info(s"batchInterVarMs ${batchInterValMs}")
var resultRdd = data.rdd
if (Objects.nonNull(sinkTaskPartitionSize)) {
@@ -105,6 +107,7 @@ private[sql] class DorisSourceProvider extends
DataSourceRegister
try {
dorisStreamLoader.loadV2(rowsBuffer)
rowsBuffer.clear()
+ Thread.sleep(batchInterValMs.longValue())
loop.break()
}
catch {
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 130ce21..a2e3ed1 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -37,6 +37,8 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
val maxRetryTimes: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
val sinkTaskPartitionSize =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
val sinkTaskUseRepartition =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+ val batchInterValMs =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
+
val dorisStreamLoader: DorisStreamLoad =
CachedDorisStreamLoadClient.getOrCreate(settings)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
@@ -91,6 +93,7 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
try {
dorisStreamLoader.load(rowArray.toString)
rowArray.removeAll()
+ Thread.sleep(batchInterValMs.longValue())
loop.break()
}
catch {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]