This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5c7cf93 [fix] Fix data duplication issue when retrying (#201)
5c7cf93 is described below
commit 5c7cf93b3310c52e4f64a25f9f03b1838a7d6b87
Author: gnehil <[email protected]>
AuthorDate: Wed May 8 10:18:40 2024 +0800
[fix] Fix data duplication issue when retrying (#201)
---
.../org/apache/doris/spark/writer/DorisWriter.scala | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 98e5510..9886b52 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.CollectionAccumulator
import org.slf4j.{Logger, LoggerFactory}
-import java.io.IOException
import java.time.Duration
import java.util.Objects
import scala.collection.JavaConverters._
@@ -94,7 +93,7 @@ class DorisWriter(settings: SparkSettings,
resultRdd.foreachPartition(iterator => {
while (iterator.hasNext) {
- val batchIterator = new BatchIterator[InternalRow](iterator,
batchSize, maxRetryTimes > 0)
+ val batchIterator = new BatchIterator(iterator, batchSize,
maxRetryTimes > 0)
val retry = Utils.retry[Option[CommitMessage],
Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) _
retry(loadFunc(batchIterator, schema))(batchIterator.reset()) match {
case Success(msg) =>
@@ -138,11 +137,11 @@ class DorisWriter(settings: SparkSettings,
* @param iterator parent iterator
* @param batchSize batch size
* @param batchRetryEnable whether enable batch retry
- * @tparam T data type
*/
- private class BatchIterator[T](iterator: Iterator[T], batchSize: Int,
batchRetryEnable: Boolean) extends Iterator[T] {
+ private class BatchIterator(iterator: Iterator[InternalRow], batchSize: Int,
batchRetryEnable: Boolean) extends Iterator[InternalRow] {
- private val buffer: ArrayBuffer[T] = if (batchRetryEnable) new
ArrayBuffer[T](batchSize) else ArrayBuffer.empty[T]
+ private val buffer: ArrayBuffer[InternalRow] =
+ if (batchRetryEnable) new ArrayBuffer[InternalRow](batchSize) else
ArrayBuffer.empty[InternalRow]
private var recordCount = 0
@@ -160,7 +159,7 @@ class DorisWriter(settings: SparkSettings,
}
}
- override def next(): T = {
+ override def next(): InternalRow = {
recordCount += 1
if (batchRetryEnable) {
if (isReset) {
@@ -194,7 +193,7 @@ class DorisWriter(settings: SparkSettings,
}
}
- private def readBuffer(): T = {
+ private def readBuffer(): InternalRow = {
if (recordCount == buffer.size) {
logger.debug("read buffer end, recordCount:{}, bufferSize: {}",
recordCount, buffer.size)
isReset = false
@@ -202,8 +201,8 @@ class DorisWriter(settings: SparkSettings,
buffer(recordCount - 1)
}
- private def writeBufferAndReturn(): T = {
- val elem = iterator.next
+ private def writeBufferAndReturn(): InternalRow = {
+ val elem = iterator.next.copy
buffer += elem
elem
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]