This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c7cf93  [fix] Fix data duplication issue when retrying (#201)
5c7cf93 is described below

commit 5c7cf93b3310c52e4f64a25f9f03b1838a7d6b87
Author: gnehil <[email protected]>
AuthorDate: Wed May 8 10:18:40 2024 +0800

    [fix] Fix data duplication issue when retrying (#201)
---
 .../org/apache/doris/spark/writer/DorisWriter.scala     | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 98e5510..9886b52 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.CollectionAccumulator
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.IOException
 import java.time.Duration
 import java.util.Objects
 import scala.collection.JavaConverters._
@@ -94,7 +93,7 @@ class DorisWriter(settings: SparkSettings,
 
     resultRdd.foreachPartition(iterator => {
       while (iterator.hasNext) {
-        val batchIterator = new BatchIterator[InternalRow](iterator, 
batchSize, maxRetryTimes > 0)
+        val batchIterator = new BatchIterator(iterator, batchSize, 
maxRetryTimes > 0)
         val retry = Utils.retry[Option[CommitMessage], 
Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) _
         retry(loadFunc(batchIterator, schema))(batchIterator.reset()) match {
           case Success(msg) =>
@@ -138,11 +137,11 @@ class DorisWriter(settings: SparkSettings,
    * @param iterator         parent iterator
    * @param batchSize        batch size
    * @param batchRetryEnable whether enable batch retry
-   * @tparam T data type
    */
-  private class BatchIterator[T](iterator: Iterator[T], batchSize: Int, 
batchRetryEnable: Boolean) extends Iterator[T] {
+  private class BatchIterator(iterator: Iterator[InternalRow], batchSize: Int, 
batchRetryEnable: Boolean) extends Iterator[InternalRow] {
 
-    private val buffer: ArrayBuffer[T] = if (batchRetryEnable) new 
ArrayBuffer[T](batchSize) else ArrayBuffer.empty[T]
+    private val buffer: ArrayBuffer[InternalRow] =
+      if (batchRetryEnable) new ArrayBuffer[InternalRow](batchSize) else 
ArrayBuffer.empty[InternalRow]
 
     private var recordCount = 0
 
@@ -160,7 +159,7 @@ class DorisWriter(settings: SparkSettings,
       }
     }
 
-    override def next(): T = {
+    override def next(): InternalRow = {
       recordCount += 1
       if (batchRetryEnable) {
         if (isReset) {
@@ -194,7 +193,7 @@ class DorisWriter(settings: SparkSettings,
       }
     }
 
-    private def readBuffer(): T = {
+    private def readBuffer(): InternalRow = {
       if (recordCount == buffer.size) {
         logger.debug("read buffer end, recordCount:{}, bufferSize: {}", 
recordCount, buffer.size)
         isReset = false
@@ -202,8 +201,8 @@ class DorisWriter(settings: SparkSettings,
       buffer(recordCount - 1)
     }
 
-    private def writeBufferAndReturn(): T = {
-      val elem = iterator.next
+    private def writeBufferAndReturn(): InternalRow = {
+      val elem = iterator.next.copy
       buffer += elem
       elem
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to