This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 53bfbd1  [Bug] Fix data duplication caused by repartition (#191)
53bfbd1 is described below

commit 53bfbd12ecbbdf54a1b6f081697e2d483d369b39
Author: wudi <[email protected]>
AuthorDate: Mon Mar 11 14:51:55 2024 +0800

    [Bug] Fix data duplication caused by repartition (#191)
---
 .../scala/org/apache/doris/spark/writer/DorisWriter.scala     | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index bd8e9f7..98e5510 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -84,14 +84,15 @@ class DorisWriter(settings: SparkSettings,
     if (enable2PC && !isStreaming) {
       dataFrame.sparkSession.sparkContext.addSparkListener(new 
DorisTransactionListener(txnAcc, txnHandler))
     }
-
-    var resultRdd = dataFrame.queryExecution.toRdd
-    val schema = dataFrame.schema
+    var resultDataFrame = dataFrame
     if (Objects.nonNull(sinkTaskPartitionSize)) {
-      resultRdd = if (sinkTaskUseRepartition) 
resultRdd.repartition(sinkTaskPartitionSize) else 
resultRdd.coalesce(sinkTaskPartitionSize)
+      resultDataFrame = if (sinkTaskUseRepartition) 
dataFrame.repartition(sinkTaskPartitionSize) else 
dataFrame.coalesce(sinkTaskPartitionSize)
     }
-    resultRdd.foreachPartition(iterator => {
 
+    val resultRdd = resultDataFrame.queryExecution.toRdd
+    val schema = resultDataFrame.schema
+
+    resultRdd.foreachPartition(iterator => {
       while (iterator.hasNext) {
         val batchIterator = new BatchIterator[InternalRow](iterator, 
batchSize, maxRetryTimes > 0)
         val retry = Utils.retry[Option[CommitMessage], 
Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) _


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to