This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1726b8285781b6cf5445dcf28ce5966aed012de9 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Mon Aug 14 07:30:04 2023 -0700 [HUDI-6214] Enabling compaction by default for batch writes with MOR table (#8718) Support better out-of-box user experience. If a user does not explicitly enable inline compaction w/ spark-datasource or spark-sql writes, inline compaction will be enabled. If user explicitly overwrites and disables, no overrides will happen. --------- Co-authored-by: Sagar Sumit <[email protected]> --- .../main/java/org/apache/hudi/DataSourceUtils.java | 12 ++++- .../scala/org/apache/hudi/DataSourceOptions.scala | 3 ++ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 53 +++++++++++++-------- .../org/apache/hudi/HoodieStreamingSink.scala | 19 ++++---- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 4 +- .../apache/hudi/functional/TestMORDataSource.scala | 2 + .../hudi/functional/TestMORDataSourceStorage.scala | 54 +++++++++++++++++++++- .../sql/hudi/TestAlterTableDropPartition.scala | 10 +++- .../spark/sql/hudi/TestCompactionTable.scala | 8 ++++ .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 4 ++ .../apache/spark/sql/hudi/TestUpdateTable.scala | 18 ++++++++ .../hudi/procedure/TestClusteringProcedure.scala | 5 ++ .../hudi/procedure/TestCompactionProcedure.scala | 16 +++++-- 13 files changed, 168 insertions(+), 40 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 93aeef1671f..a088982138b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -173,8 +173,16 @@ public class DataSourceUtils { public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName, Map<String, String> parameters) { boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key())); - boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key()) - .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); + boolean inlineCompact = false; + if (parameters.containsKey(HoodieCompactionConfig.INLINE_COMPACT.key())) { + // if inline is set, fetch the value from it. + inlineCompact = Boolean.parseBoolean(parameters.get(HoodieCompactionConfig.INLINE_COMPACT.key())); + } + // if inline is false, derive the value from asyncCompact and table type + if (!inlineCompact) { + inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key()) + .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); + } // insert/bulk-insert combining to be true, if filtering for duplicates boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key())); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 82074cbacf3..ddc9d55e50c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -971,6 +971,9 @@ object DataSourceOptionsHelper { if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && tableConfig.getPayloadClass != null) { missingWriteConfigs ++= Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> tableConfig.getPayloadClass) } + if (!params.contains(DataSourceWriteOptions.TABLE_TYPE.key())) { + missingWriteConfigs ++= Map(DataSourceWriteOptions.TABLE_TYPE.key() -> tableConfig.getTableType.name()) + } missingWriteConfigs.toMap } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 45ef82acd10..1387b3e2205 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -38,6 +38,7 @@ import org.apache.hudi.common.config._ import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.model._ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} @@ -46,7 +47,7 @@ import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY -import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException} import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} import org.apache.hudi.index.HoodieIndex @@ -79,6 +80,11 @@ import scala.collection.mutable object HoodieSparkSqlWriter { + case class StreamingWriteParams(hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, + asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty, + asyncClusteringTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty, + extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty) + /** * Controls whether incoming batch's schema's nullability constraints should be canonicalized * relative to the table's schema. For ex, in case field A is marked as null-able in table's schema, but is marked @@ -114,11 +120,8 @@ object HoodieSparkSqlWriter { mode: SaveMode, optParams: Map[String, String], sourceDf: DataFrame, - hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, - hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty, - asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty, - asyncClusteringTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty, - extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty): + streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") @@ -130,7 +133,7 @@ object HoodieSparkSqlWriter { val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - var tableConfig = getHoodieTableConfig(sparkContext, path, mode, hoodieTableConfigOpt) + var tableConfig = getHoodieTableConfig(sparkContext, path, mode, streamingWritesParamsOpt.map( _.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get) // get params w/o injecting default and validate val paramsWithoutDefaults = HoodieWriterUtils.getParamsWithAlternatives(optParams) val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults) @@ -141,8 +144,10 @@ object HoodieSparkSqlWriter { validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig); validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite); + asyncCompactionTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get + asyncClusteringTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get // re-use table configs and inject defaults. - val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) + val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode, streamingWritesParamsOpt.isDefined) val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "") val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim @@ -151,8 +156,6 @@ object HoodieSparkSqlWriter { assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)), s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") - asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined - asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined sparkContext.getConf.getOption("spark.serializer") match { case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") @@ -165,7 +168,7 @@ object HoodieSparkSqlWriter { val preppedWriteOperation = canDoPreppedWrites(hoodieConfig, parameters, operation, sourceDf) val jsc = new JavaSparkContext(sparkContext) - if (asyncCompactionTriggerFn.isDefined) { + if (streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get) { if (jsc.getConf.getOption(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined) { jsc.setLocalProperty("spark.scheduler.pool", SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME) } @@ -280,10 +283,10 @@ object HoodieSparkSqlWriter { .asInstanceOf[SparkRDDWriteClient[_]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { - asyncCompactionTriggerFn.get.apply(client) + streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client)) } if (isAsyncClusteringEnabled(client, parameters)) { - asyncClusteringTriggerFn.get.apply(client) + streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client)) } // Issue deletes @@ -360,11 +363,11 @@ object HoodieSparkSqlWriter { } if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { - asyncCompactionTriggerFn.get.apply(client) + streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client)) } if (isAsyncClusteringEnabled(client, parameters)) { - asyncClusteringTriggerFn.get.apply(client) + streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client)) } // Short-circuit if bulk_insert via row is enabled. @@ -376,7 +379,7 @@ object HoodieSparkSqlWriter { // scalastyle:on val writeConfig = client.getConfig - if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { + if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.") } // Convert to RDD[HoodieRecord] @@ -402,7 +405,8 @@ object HoodieSparkSqlWriter { val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, - TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn) + TableInstantInfo(basePath, instantTime, commitActionType, operation), streamingWritesParamsOpt.map(_.extraPreCommitFn) + .orElse(Option.apply(Option.empty)).get) (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) } finally { @@ -724,6 +728,7 @@ object HoodieSparkSqlWriter { optParams: Map[String, String], df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, + streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): Boolean = { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") @@ -736,7 +741,7 @@ object HoodieSparkSqlWriter { val tableConfig = getHoodieTableConfig(sparkContext, path, mode, hoodieTableConfigOpt) validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite) - val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) + val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode, streamingWritesParamsOpt.isDefined) val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE) val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH, @@ -1075,7 +1080,7 @@ object HoodieSparkSqlWriter { log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}") (asyncCompactionTriggerFnDefined && !client.getConfig.inlineCompactionEnabled && parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean) - && tableConfig.getTableType == HoodieTableType.MERGE_ON_READ) + && tableConfig.getTableType == MERGE_ON_READ) } private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[_], @@ -1107,7 +1112,8 @@ object HoodieSparkSqlWriter { } private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], - tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = { + tableConfig: HoodieTableConfig, mode: SaveMode, + isStreamingWrite: Boolean): (Map[String, String], HoodieConfig) = { val translatedOptions = DataSourceWriteOptions.mayBeDerivePartitionPath(optParams) var translatedOptsWithMappedTableConfig = mutable.Map.empty ++ translatedOptions.toMap if (tableConfig != null && mode != SaveMode.Overwrite) { @@ -1135,6 +1141,13 @@ object HoodieSparkSqlWriter { // enable merge allow duplicates when operation type is insert mergedParams.put(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(), "true") } + // enable inline compaction for batch writes if applicable + if (!isStreamingWrite + && mergedParams.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key(), COPY_ON_WRITE.name()) == MERGE_ON_READ.name() + && !optParams.containsKey(HoodieCompactionConfig.INLINE_COMPACT.key()) + && !optParams.containsKey(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key)) { + mergedParams.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true") + } val params = mergedParams.toMap (params, HoodieWriterUtils.convertMapToHoodieConfig(params)) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 5667c8870d3..6606bc69eec 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -17,6 +17,7 @@ package org.apache.hudi import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient @@ -27,7 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.ValidationUtils.checkArgument -import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, ConfigUtils, JsonUtils, StringUtils} +import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, ConfigUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE import org.apache.hudi.exception.{HoodieCorruptedDataException, HoodieException, TableNotFoundException} @@ -127,14 +128,14 @@ class HoodieStreamingSink(sqlContext: SQLContext, retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( - sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, - if (disableCompaction) None else Some(triggerAsyncCompactor), Some(triggerAsyncClustering), - extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata] { - override def accept(metaClient: HoodieTableMetaClient, newCommitMetadata: HoodieCommitMetadata): Unit = { - val identifier = options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()) - newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId))) - } - })) + sqlContext, mode, updatedOptions, data, Some(StreamingWriteParams(hoodieTableConfig, + if (disableCompaction) None else Some(triggerAsyncCompactor), Some(triggerAsyncClustering), + extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata] { + override def accept(metaClient: HoodieTableMetaClient, newCommitMetadata: HoodieCommitMetadata): Unit = { + val identifier = options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()) + newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId))) + } + }))), writeClient) ) match { case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 6781c229f6f..7f89817a7f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -607,13 +607,13 @@ class TestHoodieSparkSqlWriter { mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableModifier, spark.emptyDataFrame, Option.empty, - Option(client)) + Option.empty, Option(client)) // Verify that HoodieWriteClient is closed correctly verify(client, times(1)).close() val ignoreResult = HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Ignore, fooTableModifier, spark.emptyDataFrame, Option.empty, - Option(client)) + Option.empty, Option(client)) assertFalse(ignoreResult) verify(client, times(2)).close() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 2a722f24ed3..2ea66fa3f07 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -1225,6 +1225,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin thirdDf.write.format("hudi") .options(writeOpts) + // need to disable inline compaction for this test to avoid the compaction instant being completed + .option(HoodieCompactionConfig.INLINE_COMPACT.key, "false") .mode(SaveMode.Append).save(tablePath) // Read-optimized query on MOR diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index 534ee322eb9..a1b4f3e307e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -21,6 +21,7 @@ package org.apache.hudi.functional import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util.StringUtils @@ -32,13 +33,12 @@ import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource import scala.collection.JavaConversions._ - @Tag("functional") class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { @@ -129,4 +129,54 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { assertEquals(100, hudiSnapshotDF3.count()) assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) } + + @Test + def testMergeOnReadStorageDefaultCompaction(): Unit = { + val preCombineField = "fare" + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + var options: Map[String, String] = commonOpts + options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preCombineField) + val dataGen = new HoodieTestDataGenerator(0xDEEF) + val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + // Bulk Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + + val hudiDF1 = spark.read.format("org.apache.hudi") + .load(basePath) + + assertEquals(100, hudiDF1.count()) + + // upsert + for ( a <- 1 to 5) { + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Append) + .save(basePath) + } + // compaction should have been completed + val metaClient = HoodieTableMetaClient.builder.setConf(fs.getConf).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build + assertEquals(1, metaClient.getActiveTimeline.getCommitTimeline.countInstants()) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 6a97c532147..2261e83f7f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -552,7 +552,10 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { | partitioned by(ts) | location '$basePath' | """.stripMargin) - // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits` + // disable automatic inline compaction to test with pending compaction instants + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + // Create 5 deltacommits to ensure that it is >= default `hoodie.compact.inline.max.delta.commits` spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") @@ -596,7 +599,10 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { | partitioned by(ts) | location '$basePath' | """.stripMargin) - // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits` + // disable automatic inline compaction to test with pending compaction instants + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + // Create 5 deltacommits to ensure that it is >= default `hoodie.compact.inline.max.delta.commits` // Write everything into the same FileGroup but into separate blocks spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala index ea9588419b3..568e3569725 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala @@ -38,6 +38,10 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { | ) """.stripMargin) spark.sql("set hoodie.parquet.max.file.size = 10000") + // disable automatic inline compaction + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") @@ -89,6 +93,10 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { | ) """.stripMargin) spark.sql("set hoodie.parquet.max.file.size = 10000") + // disable automatic inline compaction + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 0b2b01cbec9..77df8d08418 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -235,6 +235,10 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { Seq("cow", "mor").foreach { tableType => val tableName = generateTableName val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + // disable automatic inline compaction + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index f244167d142..0c2c34ae6d9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES import org.apache.hudi.HoodieSparkUtils.isSpark2 +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.junit.jupiter.api.Assertions.assertEquals class TestUpdateTable extends HoodieSparkSqlTestBase { @@ -109,6 +112,21 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 40.0, 1000) ) + + // verify default compaction w/ MOR + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + spark.sql(s"update $tableName set price = price * 2 where id = 1") + spark.sql(s"update $tableName set price = price * 2 where id = 1") + spark.sql(s"update $tableName set price = price * 2 where id = 1") + // verify compaction is complete + val metaClient = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(tmp.getCanonicalPath + "/" + tableName) + .build() + + assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction, "commit") + } + } }) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala index 8da368039d5..85829e378a6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala @@ -60,6 +60,11 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase { | partitioned by(ts) | location '$basePath' """.stripMargin) + // disable automatic inline compaction so that HoodieDataSourceHelpers.allCompletedCommitsCompactions + // does not count compaction instants + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala index 02e9406cdde..fcbdc8df5d7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala @@ -45,6 +45,10 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase { | ) """.stripMargin) spark.sql("set hoodie.parquet.max.file.size = 10000") + // disable automatic inline compaction + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") @@ -125,6 +129,10 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase { | ) """.stripMargin) spark.sql("set hoodie.parquet.max.file.size = 10000") + // disable automatic inline compaction + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.schedule.inline=false") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") @@ -192,12 +200,14 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase { | tblproperties ( | type = 'mor', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.compact.inline ='true', - | hoodie.compact.inline.max.delta.commits ='2' + | preCombineField = 'ts' | ) | location '${tmp.getCanonicalPath}/$tableName1' """.stripMargin) + // set inline compaction + spark.sql("set hoodie.compact.inline=true") + spark.sql("set hoodie.compact.inline.max.delta.commits=2") + spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)") spark.sql(s"update $tableName1 set name = 'a2' where id = 1") spark.sql(s"update $tableName1 set name = 'a3' where id = 1")
