This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 45fe88a [improvement]Add an option to set the partition size of the
final write stage (#60)
45fe88a is described below
commit 45fe88a3520c62791c87760cef423a61575108c6
Author: lexluo09 <[email protected]>
AuthorDate: Tue Dec 20 14:14:10 2022 +0800
[improvement]Add an option to set the partition size of the final write
stage (#60)
---
.../org/apache/doris/spark/cfg/ConfigurationOptions.java | 10 ++++++++++
.../src/main/java/org/apache/doris/spark/cfg/Settings.java | 6 +++++-
.../org/apache/doris/spark/sql/DorisSourceProvider.scala | 12 +++++++++---
.../org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 11 +++++++++--
4 files changed, 33 insertions(+), 6 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 8cc4477..5ef9f19 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -75,4 +75,14 @@ public interface ConfigurationOptions {
String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio";
String STREAM_LOAD_PROP_PREFIX = "doris.sink.properties.";
+
+ String DORIS_SINK_TASK_PARTITION_SIZE = "doris.sink.task.partition.size";
+
+ /**
+ * Set doris sink task partition size. If you set a small coalesce size
and you don't have the action operations, this may result in the same
parallelism in your computation.
+ * To avoid this, you can use repartition operations. This will add a
shuffle step, but means the current upstream partitions will be executed in
parallel.
+ */
+ String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition";
+
+ boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false;
}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
index 23f0cd7..d2e845a 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
@@ -47,7 +47,11 @@ public abstract class Settings {
return value;
}
- public int getIntegerProperty(String name, int defaultValue) {
+ public Integer getIntegerProperty(String name) {
+ return getIntegerProperty(name, null);
+ }
+
+ public Integer getIntegerProperty(String name, Integer defaultValue) {
try {
if (getProperty(name) != null) {
return Integer.parseInt(getProperty(name));
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 31bd1aa..2922d63 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -29,9 +29,8 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
import java.util
-
import org.apache.doris.spark.rest.RestService
-
+import java.util.Objects
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.util.control.Breaks
@@ -64,11 +63,18 @@ private[sql] class DorisSourceProvider extends
DataSourceRegister
val maxRowCount =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+ val sinkTaskPartitionSize =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+ val sinkTaskUseRepartition =
sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
logger.info(s"maxRowCount ${maxRowCount}")
logger.info(s"maxRetryTimes ${maxRetryTimes}")
- data.rdd.foreachPartition(partition => {
+ var resultRdd = data.rdd
+ if (Objects.nonNull(sinkTaskPartitionSize)) {
+ resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
+ }
+
+ resultRdd.foreachPartition(partition => {
val rowsBuffer: util.List[util.List[Object]] = new
util.ArrayList[util.List[Object]](maxRowCount)
partition.foreach(row => {
val line: util.List[Object] = new util.ArrayList[Object]()
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index afc5f31..130ce21 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
-
+import org.apache.doris.spark.rest.RestService
+import java.util.Objects
import scala.util.control.Breaks
private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings:
SparkSettings) extends Sink with Serializable {
@@ -34,6 +35,8 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
@volatile private var latestBatchId = -1L
val maxRowCount: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+ val sinkTaskPartitionSize =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+ val sinkTaskUseRepartition =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
val dorisStreamLoader: DorisStreamLoad =
CachedDorisStreamLoadClient.getOrCreate(settings)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
@@ -47,8 +50,12 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
def write(queryExecution: QueryExecution): Unit = {
val schema = queryExecution.analyzed.output
+ var resultRdd = queryExecution.toRdd
+ if (Objects.nonNull(sinkTaskPartitionSize)) {
+ resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
+ }
// write for each partition
- queryExecution.toRdd.foreachPartition(iter => {
+ resultRdd.foreachPartition(iter => {
val objectMapper = new ObjectMapper()
val rowArray = objectMapper.createArrayNode()
iter.foreach(row => {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]