This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d4f8241ea53 [HUDI-6214] Enabling compaction by default for batch
writes with MOR table (#8718)
d4f8241ea53 is described below
commit d4f8241ea538dfdd5f01afdd132efce4316e26b9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Aug 14 07:30:04 2023 -0700
[HUDI-6214] Enabling compaction by default for batch writes with MOR table
(#8718)
Support better out-of-box user experience. If a user does not explicitly
enable
inline compaction w/ spark-datasource or spark-sql writes, inline
compaction will
be enabled. If user explicitly overwrites and disables, no overrides will
happen.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../main/java/org/apache/hudi/DataSourceUtils.java | 12 ++++-
.../scala/org/apache/hudi/DataSourceOptions.scala | 3 ++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 53 +++++++++++++--------
.../org/apache/hudi/HoodieStreamingSink.scala | 19 ++++----
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 4 +-
.../apache/hudi/functional/TestMORDataSource.scala | 2 +
.../hudi/functional/TestMORDataSourceStorage.scala | 54 +++++++++++++++++++++-
.../sql/hudi/TestAlterTableDropPartition.scala | 10 +++-
.../spark/sql/hudi/TestCompactionTable.scala | 8 ++++
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 4 ++
.../apache/spark/sql/hudi/TestUpdateTable.scala | 18 ++++++++
.../hudi/procedure/TestClusteringProcedure.scala | 5 ++
.../hudi/procedure/TestCompactionProcedure.scala | 16 +++++--
13 files changed, 168 insertions(+), 40 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 93aeef1671f..a088982138b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -173,8 +173,16 @@ public class DataSourceUtils {
public static HoodieWriteConfig createHoodieConfig(String schemaStr, String
basePath,
String tblName,
Map<String, String> parameters) {
boolean asyncCompact =
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
- boolean inlineCompact = !asyncCompact &&
parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
- .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+ boolean inlineCompact = false;
+ if (parameters.containsKey(HoodieCompactionConfig.INLINE_COMPACT.key())) {
+ // if inline is set, fetch the value from it.
+ inlineCompact =
Boolean.parseBoolean(parameters.get(HoodieCompactionConfig.INLINE_COMPACT.key()));
+ }
+ // if inline is false, derive the value from asyncCompact and table type
+ if (!inlineCompact) {
+ inlineCompact = !asyncCompact &&
parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
+ .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+ }
// insert/bulk-insert combining to be true, if filtering for duplicates
boolean combineInserts =
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key()));
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 82074cbacf3..ddc9d55e50c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -971,6 +971,9 @@ object DataSourceOptionsHelper {
if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) &&
tableConfig.getPayloadClass != null) {
missingWriteConfigs ++=
Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() ->
tableConfig.getPayloadClass)
}
+ if (!params.contains(DataSourceWriteOptions.TABLE_TYPE.key())) {
+ missingWriteConfigs ++= Map(DataSourceWriteOptions.TABLE_TYPE.key() ->
tableConfig.getTableType.name())
+ }
missingWriteConfigs.toMap
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 45ef82acd10..1387b3e2205 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -38,6 +38,7 @@ import org.apache.hudi.common.config._
import org.apache.hudi.common.engine.HoodieEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstantTimeGenerator}
@@ -46,7 +47,7 @@ import
org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option =>
HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
-import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.HoodieIndex
@@ -79,6 +80,11 @@ import scala.collection.mutable
object HoodieSparkSqlWriter {
+ case class StreamingWriteParams(hoodieTableConfigOpt:
Option[HoodieTableConfig] = Option.empty,
+ asyncCompactionTriggerFn:
Option[SparkRDDWriteClient[_] => Unit] = Option.empty,
+ asyncClusteringTriggerFn:
Option[SparkRDDWriteClient[_] => Unit] = Option.empty,
+ extraPreCommitFn:
Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty)
+
/**
* Controls whether incoming batch's schema's nullability constraints should
be canonicalized
* relative to the table's schema. For ex, in case field A is marked as
null-able in table's schema, but is marked
@@ -114,11 +120,8 @@ object HoodieSparkSqlWriter {
mode: SaveMode,
optParams: Map[String, String],
sourceDf: DataFrame,
- hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
- hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
- asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] =
Option.empty,
- asyncClusteringTriggerFn: Option[SparkRDDWriteClient[_] => Unit] =
Option.empty,
- extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient,
HoodieCommitMetadata]] = Option.empty):
+ streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
(Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)),
"'path' must be set")
@@ -130,7 +133,7 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
- var tableConfig = getHoodieTableConfig(sparkContext, path, mode,
hoodieTableConfigOpt)
+ var tableConfig = getHoodieTableConfig(sparkContext, path, mode,
streamingWritesParamsOpt.map(
_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
// get params w/o injecting default and validate
val paramsWithoutDefaults =
HoodieWriterUtils.getParamsWithAlternatives(optParams)
val originKeyGeneratorClassName =
HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
@@ -141,8 +144,10 @@ object HoodieSparkSqlWriter {
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite);
+ asyncCompactionTriggerFnDefined =
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
+ asyncClusteringTriggerFnDefined =
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
// re-use table configs and inject defaults.
- val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig, mode)
+ val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig, mode, streamingWritesParamsOpt.isDefined)
val databaseName =
hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
@@ -151,8 +156,6 @@ object HoodieSparkSqlWriter {
assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
- asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
- asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
sparkContext.getConf.getOption("spark.serializer") match {
case Some(ser) if
ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support
org.apache.spark.serializer.KryoSerializer as spark.serializer")
@@ -165,7 +168,7 @@ object HoodieSparkSqlWriter {
val preppedWriteOperation = canDoPreppedWrites(hoodieConfig, parameters,
operation, sourceDf)
val jsc = new JavaSparkContext(sparkContext)
- if (asyncCompactionTriggerFn.isDefined) {
+ if
(streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get)
{
if
(jsc.getConf.getOption(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined)
{
jsc.setLocalProperty("spark.scheduler.pool",
SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME)
}
@@ -280,10 +283,10 @@ object HoodieSparkSqlWriter {
.asInstanceOf[SparkRDDWriteClient[_]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
- asyncCompactionTriggerFn.get.apply(client)
+
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
}
if (isAsyncClusteringEnabled(client, parameters)) {
- asyncClusteringTriggerFn.get.apply(client)
+
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
}
// Issue deletes
@@ -360,11 +363,11 @@ object HoodieSparkSqlWriter {
}
if (isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
- asyncCompactionTriggerFn.get.apply(client)
+
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.get.apply(client))
}
if (isAsyncClusteringEnabled(client, parameters)) {
- asyncClusteringTriggerFn.get.apply(client)
+
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
}
// Short-circuit if bulk_insert via row is enabled.
@@ -376,7 +379,7 @@ object HoodieSparkSqlWriter {
// scalastyle:on
val writeConfig = client.getConfig
- if (writeConfig.getRecordMerger.getRecordType ==
HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) !=
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+ if (writeConfig.getRecordMerger.getRecordType ==
HoodieRecordType.SPARK && tableType == MERGE_ON_READ &&
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) !=
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
throw new
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName}
only support parquet log.")
}
// Convert to RDD[HoodieRecord]
@@ -402,7 +405,8 @@ object HoodieSparkSqlWriter {
val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
- TableInstantInfo(basePath, instantTime, commitActionType,
operation), extraPreCommitFn)
+ TableInstantInfo(basePath, instantTime, commitActionType,
operation), streamingWritesParamsOpt.map(_.extraPreCommitFn)
+ .orElse(Option.apply(Option.empty)).get)
(writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, clusteringInstant, writeClient, tableConfig)
} finally {
@@ -724,6 +728,7 @@ object HoodieSparkSqlWriter {
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
+ streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[_]] =
Option.empty): Boolean = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)),
"'path' must be set")
@@ -736,7 +741,7 @@ object HoodieSparkSqlWriter {
val tableConfig = getHoodieTableConfig(sparkContext, path, mode,
hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite)
- val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig, mode)
+ val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig, mode, streamingWritesParamsOpt.isDefined)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -1075,7 +1080,7 @@ object HoodieSparkSqlWriter {
log.info(s"Config.inlineCompactionEnabled ?
${client.getConfig.inlineCompactionEnabled}")
(asyncCompactionTriggerFnDefined &&
!client.getConfig.inlineCompactionEnabled
&& parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean)
- && tableConfig.getTableType == HoodieTableType.MERGE_ON_READ)
+ && tableConfig.getTableType == MERGE_ON_READ)
}
private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[_],
@@ -1107,7 +1112,8 @@ object HoodieSparkSqlWriter {
}
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
- tableConfig: HoodieTableConfig,
mode: SaveMode): (Map[String, String], HoodieConfig) = {
+ tableConfig: HoodieTableConfig,
mode: SaveMode,
+ isStreamingWrite: Boolean):
(Map[String, String], HoodieConfig) = {
val translatedOptions =
DataSourceWriteOptions.mayBeDerivePartitionPath(optParams)
var translatedOptsWithMappedTableConfig = mutable.Map.empty ++
translatedOptions.toMap
if (tableConfig != null && mode != SaveMode.Overwrite) {
@@ -1135,6 +1141,13 @@ object HoodieSparkSqlWriter {
// enable merge allow duplicates when operation type is insert
mergedParams.put(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(),
"true")
}
+ // enable inline compaction for batch writes if applicable
+ if (!isStreamingWrite
+ && mergedParams.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key(),
COPY_ON_WRITE.name()) == MERGE_ON_READ.name()
+ && !optParams.containsKey(HoodieCompactionConfig.INLINE_COMPACT.key())
+ &&
!optParams.containsKey(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key)) {
+ mergedParams.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true")
+ }
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 5667c8870d3..6606bc69eec 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -17,6 +17,7 @@
package org.apache.hudi
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService,
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
@@ -27,7 +28,7 @@ import
org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils,
CompactionUtils, ConfigUtils, JsonUtils, StringUtils}
+import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils,
CompactionUtils, ConfigUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.{HoodieCorruptedDataException,
HoodieException, TableNotFoundException}
@@ -127,14 +128,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
- sqlContext, mode, updatedOptions, data, hoodieTableConfig,
writeClient,
- if (disableCompaction) None else Some(triggerAsyncCompactor),
Some(triggerAsyncClustering),
- extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient,
HoodieCommitMetadata] {
- override def accept(metaClient: HoodieTableMetaClient,
newCommitMetadata: HoodieCommitMetadata): Unit = {
- val identifier =
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(),
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
- newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY,
CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
- }
- }))
+ sqlContext, mode, updatedOptions, data,
Some(StreamingWriteParams(hoodieTableConfig,
+ if (disableCompaction) None else Some(triggerAsyncCompactor),
Some(triggerAsyncClustering),
+ extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient,
HoodieCommitMetadata] {
+ override def accept(metaClient: HoodieTableMetaClient,
newCommitMetadata: HoodieCommitMetadata): Unit = {
+ val identifier =
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(),
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
+ newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY,
CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
+ }
+ }))), writeClient)
)
match {
case Success((true, commitOps, compactionInstantOps,
clusteringInstant, client, tableConfig)) =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 6781c229f6f..7f89817a7f8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -607,13 +607,13 @@ class TestHoodieSparkSqlWriter {
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append,
fooTableModifier, spark.emptyDataFrame, Option.empty,
- Option(client))
+ Option.empty, Option(client))
// Verify that HoodieWriteClient is closed correctly
verify(client, times(1)).close()
val ignoreResult = HoodieSparkSqlWriter.bootstrap(sqlContext,
SaveMode.Ignore, fooTableModifier, spark.emptyDataFrame, Option.empty,
- Option(client))
+ Option.empty, Option(client))
assertFalse(ignoreResult)
verify(client, times(2)).close()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 2a722f24ed3..2ea66fa3f07 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1225,6 +1225,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
thirdDf.write.format("hudi")
.options(writeOpts)
+ // need to disable inline compaction for this test to avoid the
compaction instant being completed
+ .option(HoodieCompactionConfig.INLINE_COMPACT.key, "false")
.mode(SaveMode.Append).save(tablePath)
// Read-optimized query on MOR
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 534ee322eb9..a1b4f3e307e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -21,6 +21,7 @@ package org.apache.hudi.functional
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.StringUtils
@@ -32,13 +33,12 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import scala.collection.JavaConversions._
-
@Tag("functional")
class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
@@ -129,4 +129,54 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
assertEquals(100, hudiSnapshotDF3.count())
assertEquals(updatedVerificationVal,
hudiSnapshotDF3.filter(col("_row_key") ===
verificationRowKey).select(verificationCol).first.getString(0))
}
+
+ @Test
+ def testMergeOnReadStorageDefaultCompaction(): Unit = {
+ val preCombineField = "fare"
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ )
+
+ var options: Map[String, String] = commonOpts
+ options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() ->
preCombineField)
+ val dataGen = new HoodieTestDataGenerator(0xDEEF)
+ val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ // Bulk Insert Operation
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDF1: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ val hudiDF1 = spark.read.format("org.apache.hudi")
+ .load(basePath)
+
+ assertEquals(100, hudiDF1.count())
+
+ // upsert
+ for ( a <- 1 to 5) {
+ val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002",
100)).toList
+ val inputDF2: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+ // compaction should have been completed
+ val metaClient =
HoodieTableMetaClient.builder.setConf(fs.getConf).setBasePath(basePath)
+ .setLoadActiveTimelineOnLoad(true).build
+ assertEquals(1,
metaClient.getActiveTimeline.getCommitTimeline.countInstants())
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 6a97c532147..2261e83f7f9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -552,7 +552,10 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
| partitioned by(ts)
| location '$basePath'
| """.stripMargin)
- // Create 5 deltacommits to ensure that it is > default
`hoodie.compact.inline.max.delta.commits`
+ // disable automatic inline compaction to test with pending compaction
instants
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+ // Create 5 deltacommits to ensure that it is >= default
`hoodie.compact.inline.max.delta.commits`
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
@@ -596,7 +599,10 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
| partitioned by(ts)
| location '$basePath'
| """.stripMargin)
- // Create 5 deltacommits to ensure that it is > default
`hoodie.compact.inline.max.delta.commits`
+ // disable automatic inline compaction to test with pending compaction
instants
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+ // Create 5 deltacommits to ensure that it is >= default
`hoodie.compact.inline.max.delta.commits`
// Write everything into the same FileGroup but into separate blocks
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index ea9588419b3..568e3569725 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -38,6 +38,10 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
+ // disable automatic inline compaction
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
@@ -89,6 +93,10 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
+ // disable automatic inline compaction
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 0b2b01cbec9..77df8d08418 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -235,6 +235,10 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ // disable automatic inline compaction
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index f244167d142..0c2c34ae6d9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.HoodieSparkUtils.isSpark2
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.junit.jupiter.api.Assertions.assertEquals
class TestUpdateTable extends HoodieSparkSqlTestBase {
@@ -109,6 +112,21 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 40.0, 1000)
)
+
+ // verify default compaction w/ MOR
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+ spark.sql(s"update $tableName set price = price * 2 where id = 1")
+ spark.sql(s"update $tableName set price = price * 2 where id = 1")
+ spark.sql(s"update $tableName set price = price * 2 where id = 1")
+ // verify compaction is complete
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(spark.sparkContext.hadoopConfiguration)
+ .setBasePath(tmp.getCanonicalPath + "/" + tableName)
+ .build()
+
+
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction,
"commit")
+ }
+
}
})
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 8da368039d5..85829e378a6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -60,6 +60,11 @@ class TestClusteringProcedure extends
HoodieSparkProcedureTestBase {
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
+ // disable automatic inline compaction so that
HoodieDataSourceHelpers.allCompletedCommitsCompactions
+ // does not count compaction instants
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 02e9406cdde..fcbdc8df5d7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -45,6 +45,10 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
+ // disable automatic inline compaction
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
@@ -125,6 +129,10 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
+ // disable automatic inline compaction
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
@@ -192,12 +200,14 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
- | preCombineField = 'ts',
- | hoodie.compact.inline ='true',
- | hoodie.compact.inline.max.delta.commits ='2'
+ | preCombineField = 'ts'
| )
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
+ // set inline compaction
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+
spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
spark.sql(s"update $tableName1 set name = 'a2' where id = 1")
spark.sql(s"update $tableName1 set name = 'a3' where id = 1")