This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eace70  [improvement](spark-connector)Support each batch interval 
time paramter setting. (#68)
6eace70 is described below

commit 6eace700ce84997d8927f1218416f30c3267cde3
Author: Hong Liu <[email protected]>
AuthorDate: Wed Feb 8 12:55:16 2023 +0800

    [improvement](spark-connector)Support each batch interval time paramter 
setting. (#68)
    
    * [improvement](spark-connector)Support each batch interval time parameter 
setting
    
    * fix default value of doris.sink.batch.intervar.ms
    
    ---------
    
    Co-authored-by: smallhibiscus <844981280>
---
 .../main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java   | 5 +++++
 .../main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala  | 3 +++
 .../main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala  | 3 +++
 3 files changed, 11 insertions(+)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 5ef9f19..a3f4061 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -85,4 +85,9 @@ public interface ConfigurationOptions {
     String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition";
 
     boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false;
+
+    String DORIS_SINK_BATCH_INTERVAL_MS = "doris.sink.batch.interval.ms";
+
+    int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
+
 }
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 0399acf..671c4b8 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -65,9 +65,11 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
     val maxRetryTimes = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, 
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
     val sinkTaskPartitionSize = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
     val sinkTaskUseRepartition = 
sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, 
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+    val batchInterValMs = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
 ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
 
     logger.info(s"maxRowCount ${maxRowCount}")
     logger.info(s"maxRetryTimes ${maxRetryTimes}")
+    logger.info(s"batchInterVarMs ${batchInterValMs}")
 
     var resultRdd = data.rdd
     if (Objects.nonNull(sinkTaskPartitionSize)) {
@@ -105,6 +107,7 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
             try {
               dorisStreamLoader.loadV2(rowsBuffer)
               rowsBuffer.clear()
+              Thread.sleep(batchInterValMs.longValue())
               loop.break()
             }
             catch {
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 130ce21..a2e3ed1 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -37,6 +37,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
   val maxRetryTimes: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, 
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
   val sinkTaskPartitionSize = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
   val sinkTaskUseRepartition = 
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, 
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+  val batchInterValMs = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, 
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
+
   val dorisStreamLoader: DorisStreamLoad = 
CachedDorisStreamLoadClient.getOrCreate(settings)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
@@ -91,6 +93,7 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
             try {
               dorisStreamLoader.load(rowArray.toString)
               rowArray.removeAll()
+              Thread.sleep(batchInterValMs.longValue())
               loop.break()
             }
             catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to