This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3789128 [improvement] add retry interval when load fail (#307)
3789128 is described below
commit 3789128a60cd13152ba3ac38227748be94276442
Author: wudi <[email protected]>
AuthorDate: Tue Apr 8 21:56:59 2025 +0800
[improvement] add retry interval when load fail (#307)
---
.../main/java/org/apache/doris/spark/config/DorisOptions.java | 1 +
.../src/main/scala/org/apache/doris/spark/sql/Utils.scala | 1 -
.../org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala | 4 ++--
.../main/scala/org/apache/doris/spark/writer/DorisWriter.scala | 9 ++++++---
.../scala/org/apache/doris/spark/write/DorisDataWriter.scala | 4 +++-
5 files changed, 12 insertions(+), 7 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 4319688..a8d29ae 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -68,6 +68,7 @@ public class DorisOptions {
public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE =
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(100000).withDescription("");
public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES =
ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription("");
+ public static final ConfigOption<Integer> DORIS_SINK_RETRY_INTERVAL_MS =
ConfigOptions.name("doris.sink.retry.interval.ms").intType().defaultValue(10000).withDescription("The
interval at which the Spark connector tries to load the batch of data again
after load fails.");
public static final ConfigOption<String> DORIS_MAX_FILTER_RATIO =
ConfigOptions.name("doris.max.filter.ratio").stringType().withoutDefaultValue().withDescription("");
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index 3135281..7bb3961 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -189,7 +189,6 @@ private[spark] object Utils {
val result = Try(f)
result match {
case Success(result) =>
- LockSupport.parkNanos(interval.toNanos)
Success(result)
case Failure(exception: T) if retryTimes > 0 =>
logger.warn(s"Execution failed caused by: ", exception)
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
index 7c1d48f..dd73f53 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
@@ -70,9 +70,9 @@ class DorisWriterFailoverITCase extends
AbstractContainerTestBase {
| "fenodes"="${getFenodes}",
| "user"="${getDorisUsername}",
| "password"="${getDorisPassword}",
- | "doris.sink.batch.interval.ms"="1000",
+ | "doris.sink.retry.interval.ms"="10000",
| "doris.sink.batch.size"="1",
- | "doris.sink.max-retries"="100",
+ | "doris.sink.max-retries"="3",
| "doris.sink.enable-2pc"="false"
|)
|""".stripMargin)
diff --git
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index d9d312c..52b1b13 100644
---
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.CollectionAccumulator
import org.slf4j.{Logger, LoggerFactory}
import java.time.Duration
+import java.util.concurrent.locks.LockSupport
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success}
@@ -45,10 +46,11 @@ class DorisWriter(config: DorisConfig,
private val maxRetryTimes: Int =
config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES)
private val batchSize: Int =
config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
- private val batchInterValMs: Int =
config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS)
+ private val batchIntervalMs: Int =
config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS)
+ private val retryIntervalMs: Int =
config.getValue(DorisOptions.DORIS_SINK_RETRY_INTERVAL_MS)
if (maxRetryTimes > 0) {
- logger.info(s"batch retry enabled, size is $batchSize, interval is
$batchInterValMs")
+ logger.info(s"batch retry enabled, size is $batchSize, retry interval is
$retryIntervalMs")
}
private val enable2PC: Boolean =
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC)
@@ -87,7 +89,7 @@ class DorisWriter(config: DorisConfig,
resultRdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val batchIterator = new BatchIterator(iterator, batchSize,
maxRetryTimes > 0)
- val retry = Utils.retry[Option[CommitMessage],
Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) _
+ val retry = Utils.retry[Option[CommitMessage],
Exception](maxRetryTimes, Duration.ofMillis(retryIntervalMs.toLong), logger) _
retry(loadFunc(batchIterator, schema))(batchIterator.reset()) match {
case Success(msg) =>
if (enable2PC) handleLoadSuccess(msg, txnAcc)
@@ -97,6 +99,7 @@ class DorisWriter(config: DorisConfig,
batchIterator.close()
throw e
}
+ LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs).toNanos)
}
})
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index 6628e9a..b6edf3c 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -45,6 +45,8 @@ class DorisDataWriter(config: DorisConfig, schema:
StructType, partitionId: Int,
private val retries = config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES)
+ private val retryIntervalMs =
config.getValue(DorisOptions.DORIS_SINK_RETRY_INTERVAL_MS)
+
private val twoPhaseCommitEnabled =
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC)
private val committedMessages = mutable.Buffer[String]()
@@ -81,7 +83,7 @@ class DorisDataWriter(config: DorisConfig, schema:
StructType, partitionId: Int,
@throws[Exception]
private def loadBatchWithRetries(record: InternalRow): Unit = {
var isRetrying = false
- Retry.exec[Unit, Exception](retries,
Duration.ofMillis(batchIntervalMs.toLong), log) {
+ Retry.exec[Unit, Exception](retries,
Duration.ofMillis(retryIntervalMs.toLong), log) {
if (isRetrying) {
// retrying, reload data from buffer
do {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]