This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1726b8285781b6cf5445dcf28ce5966aed012de9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Aug 14 07:30:04 2023 -0700

    [HUDI-6214] Enabling compaction by default for batch writes with MOR table 
(#8718)
    
    Support better out-of-box user experience. If a user does not explicitly 
enable
    inline compaction w/ spark-datasource or spark-sql writes, inline 
compaction will
    be enabled. If user explicitly overwrites and disables, no overrides will 
happen.
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../main/java/org/apache/hudi/DataSourceUtils.java | 12 ++++-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  3 ++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 53 +++++++++++++--------
 .../org/apache/hudi/HoodieStreamingSink.scala      | 19 ++++----
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  4 +-
 .../apache/hudi/functional/TestMORDataSource.scala |  2 +
 .../hudi/functional/TestMORDataSourceStorage.scala | 54 +++++++++++++++++++++-
 .../sql/hudi/TestAlterTableDropPartition.scala     | 10 +++-
 .../spark/sql/hudi/TestCompactionTable.scala       |  8 ++++
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  |  4 ++
 .../apache/spark/sql/hudi/TestUpdateTable.scala    | 18 ++++++++
 .../hudi/procedure/TestClusteringProcedure.scala   |  5 ++
 .../hudi/procedure/TestCompactionProcedure.scala   | 16 +++++--
 13 files changed, 168 insertions(+), 40 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 93aeef1671f..a088982138b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -173,8 +173,16 @@ public class DataSourceUtils {
   public static HoodieWriteConfig createHoodieConfig(String schemaStr, String 
basePath,
                                                      String tblName, 
Map<String, String> parameters) {
     boolean asyncCompact = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
-    boolean inlineCompact = !asyncCompact && 
parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
-        .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+    boolean inlineCompact = false;
+    if (parameters.containsKey(HoodieCompactionConfig.INLINE_COMPACT.key())) {
+      // if inline is set, fetch the value from it.
+      inlineCompact = 
Boolean.parseBoolean(parameters.get(HoodieCompactionConfig.INLINE_COMPACT.key()));
+    }
+    // if inline is false, derive the value from asyncCompact and table type
+    if (!inlineCompact) {
+      inlineCompact = !asyncCompact && 
parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
+          .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+    }
     // insert/bulk-insert combining to be true, if filtering for duplicates
     boolean combineInserts = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key()));
     HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 82074cbacf3..ddc9d55e50c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -971,6 +971,9 @@ object DataSourceOptionsHelper {
     if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && 
tableConfig.getPayloadClass != null) {
       missingWriteConfigs ++= 
Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> 
tableConfig.getPayloadClass)
     }
+    if (!params.contains(DataSourceWriteOptions.TABLE_TYPE.key())) {
+      missingWriteConfigs ++= Map(DataSourceWriteOptions.TABLE_TYPE.key() -> 
tableConfig.getTableType.name())
+    }
     missingWriteConfigs.toMap
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 45ef82acd10..1387b3e2205 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -38,6 +38,7 @@ import org.apache.hudi.common.config._
 import org.apache.hudi.common.engine.HoodieEngineContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstantTimeGenerator}
@@ -46,7 +47,7 @@ import 
org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
 import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
-import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.index.HoodieIndex
@@ -79,6 +80,11 @@ import scala.collection.mutable
 
 object HoodieSparkSqlWriter {
 
+  case class StreamingWriteParams(hoodieTableConfigOpt: 
Option[HoodieTableConfig] = Option.empty,
+                                  asyncCompactionTriggerFn: 
Option[SparkRDDWriteClient[_] => Unit] = Option.empty,
+                                  asyncClusteringTriggerFn: 
Option[SparkRDDWriteClient[_] => Unit] = Option.empty,
+                                  extraPreCommitFn: 
Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty)
+
   /**
    * Controls whether incoming batch's schema's nullability constraints should 
be canonicalized
    * relative to the table's schema. For ex, in case field A is marked as 
null-able in table's schema, but is marked
@@ -114,11 +120,8 @@ object HoodieSparkSqlWriter {
             mode: SaveMode,
             optParams: Map[String, String],
             sourceDf: DataFrame,
-            hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
-            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
-            asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = 
Option.empty,
-            asyncClusteringTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = 
Option.empty,
-            extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, 
HoodieCommitMetadata]] = Option.empty):
+            streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
+            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
   (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), 
"'path' must be set")
@@ -130,7 +133,7 @@ object HoodieSparkSqlWriter {
 
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
-    var tableConfig = getHoodieTableConfig(sparkContext, path, mode, 
hoodieTableConfigOpt)
+    var tableConfig = getHoodieTableConfig(sparkContext, path, mode, 
streamingWritesParamsOpt.map( 
_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
     // get params w/o injecting default and validate
     val paramsWithoutDefaults = 
HoodieWriterUtils.getParamsWithAlternatives(optParams)
     val originKeyGeneratorClassName = 
HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
@@ -141,8 +144,10 @@ object HoodieSparkSqlWriter {
     validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
     validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode 
== SaveMode.Overwrite);
 
+    asyncCompactionTriggerFnDefined = 
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
+    asyncClusteringTriggerFnDefined = 
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
     // re-use table configs and inject defaults.
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode)
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode, streamingWritesParamsOpt.isDefined)
     val databaseName = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
     val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
       s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
@@ -151,8 +156,6 @@ object HoodieSparkSqlWriter {
     
assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
       s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
 
-    asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
-    asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
     sparkContext.getConf.getOption("spark.serializer") match {
       case Some(ser) if 
ser.equals("org.apache.spark.serializer.KryoSerializer") =>
       case _ => throw new HoodieException("hoodie only support 
org.apache.spark.serializer.KryoSerializer as spark.serializer")
@@ -165,7 +168,7 @@ object HoodieSparkSqlWriter {
     val preppedWriteOperation = canDoPreppedWrites(hoodieConfig, parameters, 
operation, sourceDf)
 
     val jsc = new JavaSparkContext(sparkContext)
-    if (asyncCompactionTriggerFn.isDefined) {
+    if 
(streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get)
 {
       if 
(jsc.getConf.getOption(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined)
 {
         jsc.setLocalProperty("spark.scheduler.pool", 
SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME)
       }
@@ -280,10 +283,10 @@ object HoodieSparkSqlWriter {
               .asInstanceOf[SparkRDDWriteClient[_]]
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
-              asyncCompactionTriggerFn.get.apply(client)
+              
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
             }
             if (isAsyncClusteringEnabled(client, parameters)) {
-              asyncClusteringTriggerFn.get.apply(client)
+              
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
             }
 
             // Issue deletes
@@ -360,11 +363,11 @@ object HoodieSparkSqlWriter {
             }
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
-              asyncCompactionTriggerFn.get.apply(client)
+              
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
             }
 
             if (isAsyncClusteringEnabled(client, parameters)) {
-              asyncClusteringTriggerFn.get.apply(client)
+              
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
             }
 
             // Short-circuit if bulk_insert via row is enabled.
@@ -376,7 +379,7 @@ object HoodieSparkSqlWriter {
             // scalastyle:on
 
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && 
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+            if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK && tableType == MERGE_ON_READ && 
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
               throw new 
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} 
only support parquet log.")
             }
             // Convert to RDD[HoodieRecord]
@@ -402,7 +405,8 @@ object HoodieSparkSqlWriter {
         val (writeSuccessful, compactionInstant, clusteringInstant) =
           commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
             writeResult, parameters, writeClient, tableConfig, jsc,
-            TableInstantInfo(basePath, instantTime, commitActionType, 
operation), extraPreCommitFn)
+            TableInstantInfo(basePath, instantTime, commitActionType, 
operation), streamingWritesParamsOpt.map(_.extraPreCommitFn)
+              .orElse(Option.apply(Option.empty)).get)
 
         (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, clusteringInstant, writeClient, tableConfig)
       } finally {
@@ -724,6 +728,7 @@ object HoodieSparkSqlWriter {
                 optParams: Map[String, String],
                 df: DataFrame,
                 hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
+                streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
                 hoodieWriteClient: Option[SparkRDDWriteClient[_]] = 
Option.empty): Boolean = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), 
"'path' must be set")
@@ -736,7 +741,7 @@ object HoodieSparkSqlWriter {
     val tableConfig = getHoodieTableConfig(sparkContext, path, mode, 
hoodieTableConfigOpt)
     validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode 
== SaveMode.Overwrite)
 
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode)
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode, streamingWritesParamsOpt.isDefined)
     val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, 
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
     val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
     val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -1075,7 +1080,7 @@ object HoodieSparkSqlWriter {
     log.info(s"Config.inlineCompactionEnabled ? 
${client.getConfig.inlineCompactionEnabled}")
     (asyncCompactionTriggerFnDefined && 
!client.getConfig.inlineCompactionEnabled
       && parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean)
-      && tableConfig.getTableType == HoodieTableType.MERGE_ON_READ)
+      && tableConfig.getTableType == MERGE_ON_READ)
   }
 
   private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[_],
@@ -1107,7 +1112,8 @@ object HoodieSparkSqlWriter {
   }
 
   private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
-                                            tableConfig: HoodieTableConfig, 
mode: SaveMode): (Map[String, String], HoodieConfig) = {
+                                            tableConfig: HoodieTableConfig, 
mode: SaveMode,
+                                            isStreamingWrite: Boolean): 
(Map[String, String], HoodieConfig) = {
     val translatedOptions = 
DataSourceWriteOptions.mayBeDerivePartitionPath(optParams)
     var translatedOptsWithMappedTableConfig = mutable.Map.empty ++ 
translatedOptions.toMap
     if (tableConfig != null && mode != SaveMode.Overwrite) {
@@ -1135,6 +1141,13 @@ object HoodieSparkSqlWriter {
       // enable merge allow duplicates when operation type is insert
       
mergedParams.put(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(),
 "true")
     }
+    // enable inline compaction for batch writes if applicable
+    if (!isStreamingWrite
+      && mergedParams.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key(), 
COPY_ON_WRITE.name()) == MERGE_ON_READ.name()
+      && !optParams.containsKey(HoodieCompactionConfig.INLINE_COMPACT.key())
+      && 
!optParams.containsKey(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key)) {
+      mergedParams.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true")
+    }
     val params = mergedParams.toMap
     (params, HoodieWriterUtils.convertMapToHoodieConfig(params))
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 5667c8870d3..6606bc69eec 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -17,6 +17,7 @@
 package org.apache.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
 import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
 import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, 
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
 import org.apache.hudi.client.SparkRDDWriteClient
@@ -27,7 +28,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieInstant.State
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, 
CompactionUtils, ConfigUtils, JsonUtils, StringUtils}
+import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, 
CompactionUtils, ConfigUtils}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.{HoodieCorruptedDataException, 
HoodieException, TableNotFoundException}
@@ -127,14 +128,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
-          sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient,
-          if (disableCompaction) None else Some(triggerAsyncCompactor), 
Some(triggerAsyncClustering),
-          extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, 
HoodieCommitMetadata] {
-            override def accept(metaClient: HoodieTableMetaClient, 
newCommitMetadata: HoodieCommitMetadata): Unit = {
-              val identifier = 
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), 
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
-              newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, 
CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
-            }
-          }))
+          sqlContext, mode, updatedOptions, data, 
Some(StreamingWriteParams(hoodieTableConfig,
+            if (disableCompaction) None else Some(triggerAsyncCompactor), 
Some(triggerAsyncClustering),
+            extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, 
HoodieCommitMetadata] {
+              override def accept(metaClient: HoodieTableMetaClient, 
newCommitMetadata: HoodieCommitMetadata): Unit = {
+                val identifier = 
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), 
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
+                newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, 
CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
+              }
+            }))), writeClient)
       )
       match {
         case Success((true, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 6781c229f6f..7f89817a7f8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -607,13 +607,13 @@ class TestHoodieSparkSqlWriter {
         
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
 
       HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, 
fooTableModifier, spark.emptyDataFrame, Option.empty,
-        Option(client))
+        Option.empty, Option(client))
 
       // Verify that HoodieWriteClient is closed correctly
       verify(client, times(1)).close()
 
       val ignoreResult = HoodieSparkSqlWriter.bootstrap(sqlContext, 
SaveMode.Ignore, fooTableModifier, spark.emptyDataFrame, Option.empty,
-        Option(client))
+        Option.empty, Option(client))
       assertFalse(ignoreResult)
       verify(client, times(2)).close()
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 2a722f24ed3..2ea66fa3f07 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1225,6 +1225,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
 
     thirdDf.write.format("hudi")
       .options(writeOpts)
+      // need to disable inline compaction for this test to avoid the 
compaction instant being completed
+      .option(HoodieCompactionConfig.INLINE_COMPACT.key, "false")
       .mode(SaveMode.Append).save(tablePath)
 
     // Read-optimized query on MOR
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 534ee322eb9..a1b4f3e307e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -21,6 +21,7 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util.StringUtils
@@ -32,13 +33,12 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
 import scala.collection.JavaConversions._
 
-
 @Tag("functional")
 class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
 
@@ -129,4 +129,54 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     assertEquals(100, hudiSnapshotDF3.count())
     assertEquals(updatedVerificationVal, 
hudiSnapshotDF3.filter(col("_row_key") === 
verificationRowKey).select(verificationCol).first.getString(0))
   }
+
+  @Test
+  def testMergeOnReadStorageDefaultCompaction(): Unit = {
+    val preCombineField = "fare"
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+      "hoodie.delete.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path",
+      DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+    )
+
+    var options: Map[String, String] = commonOpts
+    options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> 
preCombineField)
+    val dataGen = new HoodieTestDataGenerator(0xDEEF)
+    val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+    // Bulk Insert Operation
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF1: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+    val hudiDF1 = spark.read.format("org.apache.hudi")
+      .load(basePath)
+
+    assertEquals(100, hudiDF1.count())
+
+    // upsert
+    for ( a <- 1 to 5) {
+      val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 
100)).toList
+      val inputDF2: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records2, 2))
+      inputDF2.write.format("org.apache.hudi")
+        .options(options)
+        .mode(SaveMode.Append)
+        .save(basePath)
+    }
+    // compaction should have been completed
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(fs.getConf).setBasePath(basePath)
+      .setLoadActiveTimelineOnLoad(true).build
+    assertEquals(1, 
metaClient.getActiveTimeline.getCommitTimeline.countInstants())
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 6a97c532147..2261e83f7f9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -552,7 +552,10 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
            | partitioned by(ts)
            | location '$basePath'
            | """.stripMargin)
-      // Create 5 deltacommits to ensure that it is > default 
`hoodie.compact.inline.max.delta.commits`
+      // disable automatic inline compaction to test with pending compaction 
instants
+      spark.sql("set hoodie.compact.inline=false")
+      spark.sql("set hoodie.compact.schedule.inline=false")
+      // Create 5 deltacommits to ensure that it is >= default 
`hoodie.compact.inline.max.delta.commits`
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
@@ -596,7 +599,10 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
            | partitioned by(ts)
            | location '$basePath'
            | """.stripMargin)
-      // Create 5 deltacommits to ensure that it is > default 
`hoodie.compact.inline.max.delta.commits`
+      // disable automatic inline compaction to test with pending compaction 
instants
+      spark.sql("set hoodie.compact.inline=false")
+      spark.sql("set hoodie.compact.schedule.inline=false")
+      // Create 5 deltacommits to ensure that it is >= default 
`hoodie.compact.inline.max.delta.commits`
       // Write everything into the same FileGroup but into separate blocks
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index ea9588419b3..568e3569725 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -38,6 +38,10 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
            | )
        """.stripMargin)
       spark.sql("set hoodie.parquet.max.file.size = 10000")
+      // disable automatic inline compaction
+      spark.sql("set hoodie.compact.inline=false")
+      spark.sql("set hoodie.compact.schedule.inline=false")
+
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
@@ -89,6 +93,10 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
            | )
        """.stripMargin)
       spark.sql("set hoodie.parquet.max.file.size = 10000")
+      // disable automatic inline compaction
+      spark.sql("set hoodie.compact.inline=false")
+      spark.sql("set hoodie.compact.schedule.inline=false")
+
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 0b2b01cbec9..77df8d08418 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -235,6 +235,10 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
+        // disable automatic inline compaction
+        spark.sql("set hoodie.compact.inline=false")
+        spark.sql("set hoodie.compact.schedule.inline=false")
+
         if (HoodieSparkUtils.gteqSpark3_1) {
           spark.sql("set hoodie.schema.on.read.enable=true")
           spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index f244167d142..0c2c34ae6d9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.HoodieSparkUtils.isSpark2
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.junit.jupiter.api.Assertions.assertEquals
 
 class TestUpdateTable extends HoodieSparkSqlTestBase {
 
@@ -109,6 +112,21 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
         checkAnswer(s"select id, name, price, ts from $tableName")(
           Seq(1, "a1", 40.0, 1000)
         )
+
+        // verify default compaction w/ MOR
+        if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+          spark.sql(s"update $tableName set price = price * 2 where id = 1")
+          spark.sql(s"update $tableName set price = price * 2 where id = 1")
+          spark.sql(s"update $tableName set price = price * 2 where id = 1")
+          // verify compaction is complete
+          val metaClient = HoodieTableMetaClient.builder()
+            .setConf(spark.sparkContext.hadoopConfiguration)
+            .setBasePath(tmp.getCanonicalPath + "/" + tableName)
+            .build()
+
+          
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction,
 "commit")
+        }
+
       }
     })
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 8da368039d5..85829e378a6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -60,6 +60,11 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
              | partitioned by(ts)
              | location '$basePath'
        """.stripMargin)
+        // disable automatic inline compaction so that 
HoodieDataSourceHelpers.allCompletedCommitsCompactions
+        // does not count compaction instants
+        spark.sql("set hoodie.compact.inline=false")
+        spark.sql("set hoodie.compact.schedule.inline=false")
+
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
         spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
         spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 02e9406cdde..fcbdc8df5d7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -45,6 +45,10 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
            | )
        """.stripMargin)
       spark.sql("set hoodie.parquet.max.file.size = 10000")
+      // disable automatic inline compaction
+      spark.sql("set hoodie.compact.inline=false")
+      spark.sql("set hoodie.compact.schedule.inline=false")
+
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
@@ -125,6 +129,10 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
            | )
        """.stripMargin)
       spark.sql("set hoodie.parquet.max.file.size = 10000")
+      // disable automatic inline compaction
+      spark.sql("set hoodie.compact.inline=false")
+      spark.sql("set hoodie.compact.schedule.inline=false")
+
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
@@ -192,12 +200,14 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
            | tblproperties (
            |  type = 'mor',
            |  primaryKey = 'id',
-           |  preCombineField = 'ts',
-           |  hoodie.compact.inline ='true',
-           |  hoodie.compact.inline.max.delta.commits ='2'
+           |  preCombineField = 'ts'
            | )
            | location '${tmp.getCanonicalPath}/$tableName1'
        """.stripMargin)
+      // set inline compaction
+      spark.sql("set hoodie.compact.inline=true")
+      spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+
       spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
       spark.sql(s"update $tableName1 set name = 'a2' where id = 1")
       spark.sql(s"update $tableName1 set name = 'a3' where id = 1")


Reply via email to