This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e76dd102bca [HUDI-4631] Adding retries to spark datasource writes on
conflict failures (#6854)
e76dd102bca is described below
commit e76dd102bcaf8aec5a932e7277ccdbfd73ce1a32
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Aug 28 07:17:45 2023 -0400
[HUDI-4631] Adding retries to spark datasource writes on conflict failures
(#6854)
Added a retry functionality to spark datasource writes automatically incase
of conflict failures.
User experience w/ multi-writers will be improved with these automatic
retries.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../org/apache/hudi/config/HoodieLockConfig.java | 16 ++++--
.../org/apache/hudi/config/HoodieWriteConfig.java | 6 ++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 40 +++++++++++--
.../apache/hudi/functional/TestCOWDataSource.scala | 66 +++++++++++++++++++++-
4 files changed, 116 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index 1d5b09629e4..b24aecf46c1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -217,16 +217,24 @@ public class HoodieLockConfig extends HoodieConfig {
.withDocumentation("Lock provider class name, this should be subclass of
"
+ "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
- /** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME}
and its methods instead */
+ /**
+ * @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME}
and its methods instead
+ */
@Deprecated
public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP =
WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key();
- /** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME}
and its methods instead */
+ /**
+ * @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME}
and its methods instead
+ */
@Deprecated
public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS
= WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.defaultValue();
- /** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods
instead */
+ /**
+ * @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead
+ */
@Deprecated
public static final String LOCK_PROVIDER_CLASS_PROP =
LOCK_PROVIDER_CLASS_NAME.key();
- /** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods
instead */
+ /**
+ * @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead
+ */
@Deprecated
public static final String DEFAULT_LOCK_PROVIDER_CLASS =
LOCK_PROVIDER_CLASS_NAME.defaultValue();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ba94d80d674..01b8fa55948 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -558,6 +558,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
.withDocumentation(WriteConcurrencyMode.class);
+ public static final ConfigProperty<Integer> NUM_RETRIES_ON_CONFLICT_FAILURES
= ConfigProperty
+ .key("hoodie.write.num.retries.on.conflict.failures")
+ .defaultValue(0)
+ .sinceVersion("0.13.0")
+ .withDocumentation("Maximum number of times to retry a batch on conflict
failure.");
+
public static final ConfigProperty<String> WRITE_SCHEMA_OVERRIDE =
ConfigProperty
.key("hoodie.write.schema")
.noDefaultValue()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e98d72d8284..57baba29c92 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.AutoRecordKeyGenerationUtils.{isAutoGenerateRecordKeys,
mayBeValidateParamsForAutoGenerationOfRecordKeys}
+import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType,
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
import
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
@@ -48,17 +48,15 @@ import org.apache.hudi.common.util.{CommitUtils,
StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
-import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
+import org.apache.hudi.exception.{HoodieException,
HoodieWriteConflictException, SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
-import org.apache.hudi.index.HoodieIndex
-import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils,
SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
-import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.{BaseKeyGenerator,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
@@ -122,6 +120,38 @@ object HoodieSparkSqlWriter {
sourceDf: DataFrame,
streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+
+ (Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
+ var succeeded = false
+ var counter = 0
+ val maxRetry: Integer =
Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(),
HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString))
+ var toReturn: (Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = null
+
+ while (counter <= maxRetry && !succeeded) {
+ try {
+ toReturn = writeInternal(sqlContext, mode, optParams, sourceDf,
streamingWritesParamsOpt, hoodieWriteClient)
+ log.warn(s"Succeeded with attempt no $counter")
+ succeeded = true
+ } catch {
+ case e: HoodieWriteConflictException =>
+ val writeConcurrencyMode =
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+ if
(writeConcurrencyMode.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
&& counter < maxRetry) {
+ counter += 1
+ log.warn(s"Conflict found. Retrying again for attempt no $counter")
+ } else {
+ throw e
+ }
+ }
+ }
+ toReturn
+ }
+
+ def writeInternal(sqlContext: SQLContext,
+ mode: SaveMode,
+ optParams: Map[String, String],
+ sourceDf: DataFrame,
+ streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]] =
Option.empty):
(Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)),
"'path' must be set")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bb36b9cdd27..104996d5c4f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -23,11 +23,11 @@ import
org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList,
getQuickstartWriteConfigs}
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -59,6 +59,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import java.sql.{Date, Timestamp}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.function.Consumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -555,11 +556,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
assertEquals(snapshotDF2.count(), 80)
}
+ /**
+ * Test retries on conflict failures.
+ */
+ @ParameterizedTest
+ @ValueSource(ints = Array(0, 2))
+ def testCopyOnWriteConcurrentUpdates(numRetries: Integer): Unit = {
+ initTestDataGenerator()
+ val records1 = recordsToStrings(dataGen.generateInserts("000",
1000)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.write.concurrency.mode",
"optimistic_concurrency_control")
+ .option("hoodie.cleaner.policy.failed.writes", "LAZY")
+ .option("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotDF1 = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*/*/*")
+ assertEquals(1000, snapshotDF1.count())
+
+ val countDownLatch = new CountDownLatch(2)
+ for (x <- 1 to 2) {
+ val thread = new Thread(new UpdateThread(dataGen, spark, commonOpts,
basePath, x + "00", countDownLatch, numRetries))
+ thread.setName((x + "00_THREAD").toString())
+ thread.start()
+ }
+ countDownLatch.await(1, TimeUnit.MINUTES)
+
+ val snapshotDF2 = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*/*/*")
+ if (numRetries > 0) {
+ assertEquals(snapshotDF2.count(), 3000)
+ assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size(), 3)
+ } else {
+ // only one among two threads will succeed and hence 2000
+ assertEquals(snapshotDF2.count(), 2000)
+ assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size(), 2)
+ }
+ }
+
+ class UpdateThread(dataGen: HoodieTestDataGenerator, spark: SparkSession,
commonOpts: Map[String, String], basePath: String,
+ instantTime: String, countDownLatch: CountDownLatch,
numRetries: Integer = 0) extends Runnable {
+ override def run() {
+ val updateRecs =
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 500)).toList
+ val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime,
1000)).toList
+ val updateDf =
spark.read.json(spark.sparkContext.parallelize(updateRecs, 2))
+ val insertDf =
spark.read.json(spark.sparkContext.parallelize(insertRecs, 2))
+ updateDf.union(insertDf).write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.write.concurrency.mode",
"optimistic_concurrency_control")
+ .option("hoodie.cleaner.policy.failed.writes", "LAZY")
+ .option("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+ .option(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(),
numRetries.toString)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ countDownLatch.countDown()
+ }
+ }
+
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
def testOverWriteModeUseReplaceAction(recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
-
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")