This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 53bfbd1 [Bug] Fix data duplication caused by repartition (#191)
53bfbd1 is described below
commit 53bfbd12ecbbdf54a1b6f081697e2d483d369b39
Author: wudi <[email protected]>
AuthorDate: Mon Mar 11 14:51:55 2024 +0800
[Bug] Fix data duplication caused by repartition (#191)
---
.../scala/org/apache/doris/spark/writer/DorisWriter.scala | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index bd8e9f7..98e5510 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -84,14 +84,15 @@ class DorisWriter(settings: SparkSettings,
if (enable2PC && !isStreaming) {
dataFrame.sparkSession.sparkContext.addSparkListener(new
DorisTransactionListener(txnAcc, txnHandler))
}
-
- var resultRdd = dataFrame.queryExecution.toRdd
- val schema = dataFrame.schema
+ var resultDataFrame = dataFrame
if (Objects.nonNull(sinkTaskPartitionSize)) {
- resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
+ resultDataFrame = if (sinkTaskUseRepartition)
dataFrame.repartition(sinkTaskPartitionSize) else
dataFrame.coalesce(sinkTaskPartitionSize)
}
- resultRdd.foreachPartition(iterator => {
+ val resultRdd = resultDataFrame.queryExecution.toRdd
+ val schema = resultDataFrame.schema
+
+ resultRdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val batchIterator = new BatchIterator[InternalRow](iterator,
batchSize, maxRetryTimes > 0)
val retry = Utils.retry[Option[CommitMessage],
Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) _
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]