This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 721f8ab [feature]support group commit (#224)
721f8ab is described below
commit 721f8abd8a46efcec592922f10fb155427d6e37b
Author: Petrichor <[email protected]>
AuthorDate: Mon Aug 26 10:08:07 2024 +0800
[feature]support group commit (#224)
---
.../doris/spark/cfg/ConfigurationOptions.java | 21 +++++++++++
.../org/apache/doris/spark/load/StreamLoader.scala | 41 ++++++++++++++++++++--
.../doris/spark/sql/TestConnectorWriteDoris.scala | 24 +++++++++++++
3 files changed, 84 insertions(+), 2 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 61d4563..8f64c74 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -17,6 +17,11 @@
package org.apache.doris.spark.cfg;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public interface ConfigurationOptions {
// doris fe node address
String DORIS_FENODES = "doris.fenodes";
@@ -140,4 +145,20 @@ public interface ConfigurationOptions {
String LOAD_MODE = "doris.sink.load.mode";
String DEFAULT_LOAD_MODE = "stream_load";
+ /**
+ * partial_columns
+ */
+
+ String PARTIAL_COLUMNS= "partial_columns";
+
+ /**
+ * Group commit
+ */
+ String GROUP_COMMIT = "group_commit";
+ Set<String> immutableGroupMode = Collections.unmodifiableSet(new
HashSet<>(Arrays.asList(
+ "sync_mode",
+ "async_mode",
+ "off_mode"
+ )));
+
}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 06bb56f..5f7765d 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -24,7 +24,7 @@ import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.exception.{IllegalArgumentException,
StreamLoadException}
+import org.apache.doris.spark.exception.{DorisException,
IllegalArgumentException, StreamLoadException}
import org.apache.doris.spark.rest.RestService
import org.apache.doris.spark.rest.models.BackendV2.BackendRowV2
import org.apache.doris.spark.rest.models.RespContent
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException
import java.util.zip.GZIPOutputStream
import java.util.{Base64, Calendar, Collections, UUID}
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.{Failure, Success, Try}
case class StreamLoadResponse(code: Int, msg: String, content: String)
@@ -81,6 +82,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
private val enableHttps: Boolean =
settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT) && autoRedirect
+ private val enableGroupCommit: Boolean =
streamLoadProps.contains(ConfigurationOptions.GROUP_COMMIT)
/**
* execute stream load
*
@@ -205,10 +207,39 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
props += "read_json_by_line" -> "true"
props.remove("strip_outer_array")
}
+
+ //get group commit mode
+ if (!validateGroupCommitMode(props)) {
+ props.remove(ConfigurationOptions.GROUP_COMMIT)
+ }
+
props.remove("columns")
props.toMap
}
+
+ private def validateGroupCommitMode(props: mutable.Map[String, String]):
Boolean = {
+ if (!props.contains(ConfigurationOptions.GROUP_COMMIT)) {
+ return false;
+ }
+
+ val value = props(ConfigurationOptions.GROUP_COMMIT)
+ val normalizedValue = value.trim().toLowerCase();
+ if (!ConfigurationOptions.immutableGroupMode.contains(normalizedValue)) {
+ throw new DorisException(
+ "The value of group commit mode is an illegal parameter, illegal
value="
+ + value);
+ } else if (enableTwoPhaseCommit) {
+ throw new DorisException(
+ "When group commit is enabled, you should disable two phase commit!");
+ } else if (props.contains(ConfigurationOptions.PARTIAL_COLUMNS)
+ && props(ConfigurationOptions.PARTIAL_COLUMNS).equalsIgnoreCase("true"))
{
+ throw new DorisException(
+ "When group commit is enabled,you can not load data with partial
column update.");
+ }
+ true;
+ }
+
/**
* get http client
*
@@ -245,7 +276,10 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
val put = new HttpPut(currentLoadUrl)
addCommonHeader(put)
- put.setHeader("label", label)
+
+ if (label != null && StringUtils.isNotBlank(label)) {
+ put.setHeader("label", label)
+ }
val columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS)
if (StringUtils.isNotBlank(columns)) {
@@ -355,6 +389,9 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
* @return load label
*/
private def generateLoadLabel(): String = {
+ if (enableGroupCommit) {
+ return null;
+ }
val calendar = Calendar.getInstance
"spark_streamload_" +
f"${calendar.get(Calendar.YEAR)}${calendar.get(Calendar.MONTH) +
1}%02d${calendar.get(Calendar.DAY_OF_MONTH)}%02d" +
diff --git
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
index dbea21a..e48360b 100644
---
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
+++
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
@@ -130,6 +130,30 @@ class TestConnectorWriteDoris {
spark.stop()
}
+ @Test
+ def groupCommitWriteTest(): Unit = {
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = spark.createDataFrame(Seq(
+ ("1", 100, "待付款"),
+ ("2", 200, "待发货"),
+ ("3", 300, "已收货")
+ )).toDF("order_id", "order_amount", "order_status")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", dorisFeNodes)
+ .option("doris.table.identifier", dorisTable)
+ .option("user", dorisUser)
+ .option("password", dorisPwd)
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
+ .option("sink.properties.format", "json")
+ .option("sink.properties.group_commit", "async_mode")
+ .option("doris.sink.enable-2pc","false")
+ .option("sink.properties.partial_columns","false")
+ .save()
+ spark.stop()
+ }
+
/**
* correct data in doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]