This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 721f8ab  [feature]support group commit (#224)
721f8ab is described below

commit 721f8abd8a46efcec592922f10fb155427d6e37b
Author: Petrichor <[email protected]>
AuthorDate: Mon Aug 26 10:08:07 2024 +0800

    [feature]support group commit (#224)
---
 .../doris/spark/cfg/ConfigurationOptions.java      | 21 +++++++++++
 .../org/apache/doris/spark/load/StreamLoader.scala | 41 ++++++++++++++++++++--
 .../doris/spark/sql/TestConnectorWriteDoris.scala  | 24 +++++++++++++
 3 files changed, 84 insertions(+), 2 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 61d4563..8f64c74 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -17,6 +17,11 @@
 
 package org.apache.doris.spark.cfg;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 public interface ConfigurationOptions {
     // doris fe node address
     String DORIS_FENODES = "doris.fenodes";
@@ -140,4 +145,20 @@ public interface ConfigurationOptions {
     String LOAD_MODE = "doris.sink.load.mode";
     String DEFAULT_LOAD_MODE = "stream_load";
 
+    /**
+     * partial_columns
+     */
+
+    String PARTIAL_COLUMNS= "partial_columns";
+
+    /**
+     * Group commit
+     */
+    String GROUP_COMMIT = "group_commit";
+    Set<String> immutableGroupMode = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(
+            "sync_mode",
+            "async_mode",
+            "off_mode"
+    )));
+
 }
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 06bb56f..5f7765d 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -24,7 +24,7 @@ import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.exception.{IllegalArgumentException, 
StreamLoadException}
+import org.apache.doris.spark.exception.{DorisException, 
IllegalArgumentException, StreamLoadException}
 import org.apache.doris.spark.rest.RestService
 import org.apache.doris.spark.rest.models.BackendV2.BackendRowV2
 import org.apache.doris.spark.rest.models.RespContent
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException
 import java.util.zip.GZIPOutputStream
 import java.util.{Base64, Calendar, Collections, UUID}
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
 
 case class StreamLoadResponse(code: Int, msg: String, content: String)
@@ -81,6 +82,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
   private val enableHttps: Boolean = 
settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
     ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT) && autoRedirect
 
+  private val enableGroupCommit: Boolean = 
streamLoadProps.contains(ConfigurationOptions.GROUP_COMMIT)
   /**
    * execute stream load
    *
@@ -205,10 +207,39 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
       props += "read_json_by_line" -> "true"
       props.remove("strip_outer_array")
     }
+
+    //get group commit mode
+    if (!validateGroupCommitMode(props)) {
+      props.remove(ConfigurationOptions.GROUP_COMMIT)
+    }
+
     props.remove("columns")
     props.toMap
   }
 
+
+  private def validateGroupCommitMode(props: mutable.Map[String, String]): 
Boolean = {
+    if (!props.contains(ConfigurationOptions.GROUP_COMMIT)) {
+      return false;
+    }
+
+    val value = props(ConfigurationOptions.GROUP_COMMIT)
+    val normalizedValue = value.trim().toLowerCase();
+    if (!ConfigurationOptions.immutableGroupMode.contains(normalizedValue)) {
+      throw new DorisException(
+        "The value of group commit mode is an illegal parameter, illegal 
value="
+          + value);
+    } else if (enableTwoPhaseCommit) {
+      throw new DorisException(
+        "When group commit is enabled, you should disable two phase commit!");
+    } else if (props.contains(ConfigurationOptions.PARTIAL_COLUMNS)
+      && props(ConfigurationOptions.PARTIAL_COLUMNS).equalsIgnoreCase("true")) 
{
+      throw new DorisException(
+        "When group commit is enabled,you can not load data with partial 
column update.");
+    }
+    true;
+  }
+
   /**
    * get http client
    *
@@ -245,7 +276,10 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
     val put = new HttpPut(currentLoadUrl)
     addCommonHeader(put)
 
-    put.setHeader("label", label)
+
+    if (label != null && StringUtils.isNotBlank(label)) {
+      put.setHeader("label", label)
+    }
 
     val columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS)
     if (StringUtils.isNotBlank(columns)) {
@@ -355,6 +389,9 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
    * @return load label
    */
   private def generateLoadLabel(): String = {
+    if (enableGroupCommit) {
+      return null;
+    }
     val calendar = Calendar.getInstance
     "spark_streamload_" +
       f"${calendar.get(Calendar.YEAR)}${calendar.get(Calendar.MONTH) + 
1}%02d${calendar.get(Calendar.DAY_OF_MONTH)}%02d" +
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
index dbea21a..e48360b 100644
--- 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
@@ -130,6 +130,30 @@ class TestConnectorWriteDoris {
     spark.stop()
   }
 
+  @Test
+  def groupCommitWriteTest(): Unit = {
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = spark.createDataFrame(Seq(
+      ("1", 100, "待付款"),
+      ("2", 200, "待发货"),
+      ("3", 300, "已收货")
+    )).toDF("order_id", "order_amount", "order_status")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", dorisFeNodes)
+      .option("doris.table.identifier", dorisTable)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      .option("sink.batch.size", 2)
+      .option("sink.max-retries", 2)
+      .option("sink.properties.format", "json")
+      .option("sink.properties.group_commit", "async_mode")
+      .option("doris.sink.enable-2pc","false")
+      .option("sink.properties.partial_columns","false")
+      .save()
+    spark.stop()
+  }
+
 
   /**
    * correct data in doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to