This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ae426bc483f [HUDI-5351] Handle populateMetaFields when repartitioning
in sort partitioner (#7411)
ae426bc483f is described below
commit ae426bc483ffb310e99738219e6ecc9cb8336c0c
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Dec 13 10:22:10 2022 +0530
[HUDI-5351] Handle populateMetaFields when repartitioning in sort
partitioner (#7411)
---
.../MultipleSparkJobExecutionStrategy.java | 6 +-
.../BulkInsertInternalPartitionerFactory.java | 26 +++----
...lkInsertInternalPartitionerWithRowsFactory.java | 19 ++---
.../bulkinsert/GlobalSortPartitioner.java | 14 ++++
.../bulkinsert/GlobalSortPartitionerWithRows.java | 14 ++++
...PartitionPathRepartitionAndSortPartitioner.java | 12 +++-
...nPathRepartitionAndSortPartitionerWithRows.java | 12 +++-
.../PartitionPathRepartitionPartitioner.java | 12 +++-
...artitionPathRepartitionPartitionerWithRows.java | 12 +++-
.../PartitionSortPartitionerWithRows.java | 14 ++++
.../bulkinsert/RDDPartitionSortPartitioner.java | 14 ++++
.../TestBulkInsertInternalPartitioner.java | 83 ++++++++++++++--------
.../TestBulkInsertInternalPartitionerForRows.java | 69 ++++++++++++------
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +-
14 files changed, 227 insertions(+), 85 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 074deaa6212..954daaad1e1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -206,10 +206,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
throw new UnsupportedOperationException(String.format("Layout
optimization strategy '%s' is not supported", layoutOptStrategy));
}
}).orElse(isRowPartitioner
- ? BulkInsertInternalPartitionerWithRowsFactory.get(
- getWriteConfig().getBulkInsertSortMode(),
getHoodieTable().isPartitioned(), true)
- : BulkInsertInternalPartitionerFactory.get(
- getWriteConfig().getBulkInsertSortMode(),
getHoodieTable().isPartitioned(), true));
+ ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(),
getHoodieTable().isPartitioned(), true)
+ : BulkInsertInternalPartitionerFactory.get(getHoodieTable(),
getWriteConfig(), true));
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
index d961c8f9de3..657c671f523 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
@@ -30,37 +30,37 @@ import org.apache.hudi.table.HoodieTable;
*/
public abstract class BulkInsertInternalPartitionerFactory {
- public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig
config) {
+ public static BulkInsertPartitioner get(HoodieTable table,
+ HoodieWriteConfig config) {
return get(table, config, false);
}
- public static BulkInsertPartitioner get(
- HoodieTable table, HoodieWriteConfig config, boolean
enforceNumOutputPartitions) {
+ public static BulkInsertPartitioner get(HoodieTable table,
+ HoodieWriteConfig config,
+ boolean enforceNumOutputPartitions) {
if (config.getIndexType().equals(HoodieIndex.IndexType.BUCKET)
&&
config.getBucketIndexEngineType().equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING))
{
return new RDDConsistentBucketPartitioner(table);
}
- return get(config.getBulkInsertSortMode(), table.isPartitioned(),
enforceNumOutputPartitions);
- }
-
- public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean
isTablePartitioned) {
- return get(sortMode, isTablePartitioned, false);
+ return get(config, table.isPartitioned(), enforceNumOutputPartitions);
}
- public static BulkInsertPartitioner get(BulkInsertSortMode sortMode,
+ public static BulkInsertPartitioner get(HoodieWriteConfig config,
boolean isTablePartitioned,
boolean enforceNumOutputPartitions) {
+ BulkInsertSortMode sortMode = config.getBulkInsertSortMode();
+
switch (sortMode) {
case NONE:
return new NonSortPartitioner(enforceNumOutputPartitions);
case GLOBAL_SORT:
- return new GlobalSortPartitioner();
+ return new GlobalSortPartitioner(config);
case PARTITION_SORT:
- return new RDDPartitionSortPartitioner();
+ return new RDDPartitionSortPartitioner(config);
case PARTITION_PATH_REPARTITION:
- return new PartitionPathRepartitionPartitioner(isTablePartitioned);
+ return new PartitionPathRepartitionPartitioner(isTablePartitioned,
config);
case PARTITION_PATH_REPARTITION_AND_SORT:
- return new
PartitionPathRepartitionAndSortPartitioner(isTablePartitioned);
+ return new
PartitionPathRepartitionAndSortPartitioner(isTablePartitioned, config);
default:
throw new HoodieException("The bulk insert sort mode \"" +
sortMode.name() + "\" is not supported.");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
index 218eae0dc94..07995e50d6a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
@@ -18,6 +18,7 @@
package org.apache.hudi.execution.bulkinsert;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.sql.Dataset;
@@ -29,24 +30,26 @@ import org.apache.spark.sql.Row;
*/
public abstract class BulkInsertInternalPartitionerWithRowsFactory {
- public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode
sortMode,
+ public static BulkInsertPartitioner<Dataset<Row>> get(HoodieWriteConfig
config,
boolean
isTablePartitioned) {
- return get(sortMode, isTablePartitioned, false);
+ return get(config, isTablePartitioned, false);
}
- public static BulkInsertPartitioner<Dataset<Row>> get(
- BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean
enforceNumOutputPartitions) {
+ public static BulkInsertPartitioner<Dataset<Row>> get(HoodieWriteConfig
config,
+ boolean
isTablePartitioned,
+ boolean
enforceNumOutputPartitions) {
+ BulkInsertSortMode sortMode = config.getBulkInsertSortMode();
switch (sortMode) {
case NONE:
return new NonSortPartitionerWithRows(enforceNumOutputPartitions);
case GLOBAL_SORT:
- return new GlobalSortPartitionerWithRows();
+ return new GlobalSortPartitionerWithRows(config);
case PARTITION_SORT:
- return new PartitionSortPartitionerWithRows();
+ return new PartitionSortPartitionerWithRows(config);
case PARTITION_PATH_REPARTITION:
- return new
PartitionPathRepartitionPartitionerWithRows(isTablePartitioned);
+ return new
PartitionPathRepartitionPartitionerWithRows(isTablePartitioned, config);
case PARTITION_PATH_REPARTITION_AND_SORT:
- return new
PartitionPathRepartitionAndSortPartitionerWithRows(isTablePartitioned);
+ return new
PartitionPathRepartitionAndSortPartitionerWithRows(isTablePartitioned, config);
default:
throw new UnsupportedOperationException("The bulk insert sort mode \""
+ sortMode.name() + "\" is not supported.");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
index a184c009a1b..e10d23743da 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
@@ -20,10 +20,14 @@ package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.GLOBAL_SORT;
+
/**
* A built-in partitioner that does global sorting for the input records
across partitions
* after repartition for bulk insert operation, corresponding to the
@@ -34,9 +38,19 @@ import org.apache.spark.api.java.JavaRDD;
public class GlobalSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+ private final boolean shouldPopulateMetaFields;
+
+ public GlobalSortPartitioner(HoodieWriteConfig config) {
+ this.shouldPopulateMetaFields = config.populateMetaFields();
+ }
+
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records,
int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(GLOBAL_SORT.name() + " mode requires
meta-fields to be enabled");
+ }
+
// Now, sort the records and line them up nicely for loading.
return records.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
index 24bcc0aff0d..1a8e87a3823 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
@@ -19,19 +19,33 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.GLOBAL_SORT;
+
/**
* A built-in partitioner that does global sorting for the input Rows across
partitions after repartition for bulk insert operation, corresponding to the
{@code BulkInsertSortMode.GLOBAL_SORT} mode.
*/
public class GlobalSortPartitionerWithRows implements
BulkInsertPartitioner<Dataset<Row>> {
+ private final boolean shouldPopulateMetaFields;
+
+ public GlobalSortPartitionerWithRows(HoodieWriteConfig config) {
+ this.shouldPopulateMetaFields = config.populateMetaFields();
+ }
+
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(GLOBAL_SORT.name() + " mode requires
meta-fields to be enabled");
+ }
+
// Now, sort the records and line them up nicely for loading.
// Let's use "partitionPath + key" as the sort key.
return
rows.sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD),
functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD))
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
index e8e1e2072f5..a47f1f9df43 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
@@ -21,12 +21,16 @@ package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT;
+
/**
* A built-in partitioner that does the following for input records for bulk
insert operation
* <p>
@@ -45,14 +49,20 @@ public class PartitionPathRepartitionAndSortPartitioner<T
extends HoodieRecordPa
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
private final boolean isTablePartitioned;
+ private final boolean shouldPopulateMetaFields;
- public PartitionPathRepartitionAndSortPartitioner(boolean
isTablePartitioned) {
+ public PartitionPathRepartitionAndSortPartitioner(boolean
isTablePartitioned, HoodieWriteConfig config) {
this.isTablePartitioned = isTablePartitioned;
+ this.shouldPopulateMetaFields = config.populateMetaFields();
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records,
int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(PARTITION_PATH_REPARTITION_AND_SORT.name() + "
mode requires meta-fields to be enabled");
+ }
+
if (isTablePartitioned) {
PartitionPathRDDPartitioner partitioner = new
PartitionPathRDDPartitioner(
(partitionPath) -> (String) partitionPath, outputSparkPartitions);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
index cf3ff1acfa4..ff505624913 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
@@ -20,12 +20,16 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT;
+
/**
* A built-in partitioner that does the following for input rows for bulk
insert operation
* <p>
@@ -41,13 +45,19 @@ import org.apache.spark.sql.Row;
public class PartitionPathRepartitionAndSortPartitionerWithRows implements
BulkInsertPartitioner<Dataset<Row>> {
private final boolean isTablePartitioned;
+ private final boolean shouldPopulateMetaFields;
- public PartitionPathRepartitionAndSortPartitionerWithRows(boolean
isTablePartitioned) {
+ public PartitionPathRepartitionAndSortPartitionerWithRows(boolean
isTablePartitioned, HoodieWriteConfig config) {
this.isTablePartitioned = isTablePartitioned;
+ this.shouldPopulateMetaFields = config.populateMetaFields();
}
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(PARTITION_PATH_REPARTITION_AND_SORT.name() + "
mode requires meta-fields to be enabled");
+ }
+
if (isTablePartitioned) {
return rows.repartition(outputSparkPartitions, new
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.sortWithinPartitions(new
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
index 5931b565757..393e47a61e3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
@@ -21,12 +21,16 @@ package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION;
+
/**
* A built-in partitioner that does the following for input records for bulk
insert operation
* <p>
@@ -44,14 +48,20 @@ public class PartitionPathRepartitionPartitioner<T extends
HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
private final boolean isTablePartitioned;
+ private final boolean shouldPopulateMetaFields;
- public PartitionPathRepartitionPartitioner(boolean isTablePartitioned) {
+ public PartitionPathRepartitionPartitioner(boolean isTablePartitioned,
HoodieWriteConfig config) {
this.isTablePartitioned = isTablePartitioned;
+ this.shouldPopulateMetaFields = config.populateMetaFields();
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records,
int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(PARTITION_PATH_REPARTITION.name() + " mode
requires meta-fields to be enabled");
+ }
+
if (isTablePartitioned) {
PartitionPathRDDPartitioner partitioner = new
PartitionPathRDDPartitioner(
(partitionPath) -> (String) partitionPath, outputSparkPartitions);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
index 62d9edbca84..966648d579c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
@@ -20,12 +20,16 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION;
+
/**
* A built-in partitioner that does the following for input rows for bulk
insert operation
* <p>
@@ -40,13 +44,19 @@ import org.apache.spark.sql.Row;
public class PartitionPathRepartitionPartitionerWithRows implements
BulkInsertPartitioner<Dataset<Row>> {
private final boolean isTablePartitioned;
+ private final boolean shouldPopulateMetaFields;
- public PartitionPathRepartitionPartitionerWithRows(boolean
isTablePartitioned) {
+ public PartitionPathRepartitionPartitionerWithRows(boolean
isTablePartitioned, HoodieWriteConfig config) {
this.isTablePartitioned = isTablePartitioned;
+ this.shouldPopulateMetaFields = config.populateMetaFields();
}
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(PARTITION_PATH_REPARTITION.name() + " mode
requires meta-fields to be enabled");
+ }
+
if (isTablePartitioned) {
return rows.repartition(outputSparkPartitions, new
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
index b669c338f86..c5c21f14a8e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
@@ -19,18 +19,32 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_SORT;
+
/**
* A built-in partitioner that does local sorting for each spark partitions
after coalesce for bulk insert operation, corresponding to the {@code
BulkInsertSortMode.PARTITION_SORT} mode.
*/
public class PartitionSortPartitionerWithRows implements
BulkInsertPartitioner<Dataset<Row>> {
+ private final boolean shouldPopulateMetaFields;
+
+ public PartitionSortPartitionerWithRows(HoodieWriteConfig config) {
+ this.shouldPopulateMetaFields = config.populateMetaFields();
+ }
+
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(PARTITION_SORT.name() + " mode requires
meta-fields to be enabled");
+ }
+
return
rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
index 9526ad58564..b5dc83cc2fa 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
@@ -20,6 +20,8 @@ package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
@@ -30,6 +32,8 @@ import java.util.List;
import scala.Tuple2;
+import static
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_SORT;
+
/**
* A built-in partitioner that does local sorting for each RDD partition
* after coalesce for bulk insert operation, corresponding to the
@@ -40,9 +44,19 @@ import scala.Tuple2;
public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+ private final boolean shouldPopulateMetaFields;
+
+ public RDDPartitionSortPartitioner(HoodieWriteConfig config) {
+ this.shouldPopulateMetaFields = config.populateMetaFields();
+ }
+
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records,
int
outputSparkPartitions) {
+ if (!shouldPopulateMetaFields) {
+ throw new HoodieException(PARTITION_SORT.name() + " mode requires
meta-fields to be enabled");
+ }
+
return records.coalesce(outputSparkPartitions)
.mapToPair(record ->
new Tuple2<>(
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index 7bc64b54457..11420c180eb 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieClientTestBase;
@@ -48,6 +49,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase
implements Serializable {
private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>>
KEY_COMPARATOR =
@@ -83,16 +85,21 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
// boolean isTablePartitioned,
// boolean enforceNumOutputPartitions,
// boolean isGloballySorted,
- // boolean isLocallySorted
+ // boolean isLocallySorted,
+ // boolean populateMetaFields
Object[][] data = new Object[][] {
- {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
- {BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false,
false},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false,
false},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true,
false, false},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true,
false, false},
- {BulkInsertSortMode.NONE, true, true, false, false},
- {BulkInsertSortMode.NONE, true, false, false, false}
+ {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true},
+ {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false,
false, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false,
false, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true,
false, false, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true,
false, false, true},
+ {BulkInsertSortMode.NONE, true, true, false, false, true},
+ {BulkInsertSortMode.NONE, true, false, false, false, true},
+ {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false},
+ {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false,
false, false},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true,
false, false, false}
};
return Stream.of(data).map(Arguments::of);
}
@@ -109,7 +116,8 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
boolean
enforceNumOutputPartitions,
boolean isGloballySorted,
boolean isLocallySorted,
- Map<String, Long>
expectedPartitionNumRecords) {
+ Map<String, Long>
expectedPartitionNumRecords,
+ boolean populateMetaFields) {
testBulkInsertInternalPartitioner(
partitioner,
records,
@@ -117,7 +125,8 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
isGloballySorted,
isLocallySorted,
expectedPartitionNumRecords,
- Option.empty());
+ Option.empty(),
+ populateMetaFields);
}
private void testBulkInsertInternalPartitioner(BulkInsertPartitioner
partitioner,
@@ -126,8 +135,13 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
boolean isGloballySorted,
boolean isLocallySorted,
Map<String, Long>
expectedPartitionNumRecords,
-
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
+
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator,
+ boolean populateMetaFields) {
int numPartitions = 2;
+ if (!populateMetaFields) {
+ assertThrows(HoodieException.class, () ->
partitioner.repartitionRecords(records, numPartitions));
+ return;
+ }
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords =
(JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>)
partitioner.repartitionRecords(records, numPartitions);
assertEquals(
@@ -163,25 +177,35 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
boolean isTablePartitioned,
boolean
enforceNumOutputPartitions,
boolean isGloballySorted,
- boolean isLocallySorted) {
+ boolean isLocallySorted,
+ boolean populateMetaFields) {
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 =
generateTripleTestRecordsForBulkInsert(jsc);
+
+ HoodieWriteConfig config = HoodieWriteConfig
+ .newBuilder()
+ .withPath("/")
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withBulkInsertSortMode(sortMode.name())
+ .withPopulateMetaFields(populateMetaFields)
+ .build();
+
testBulkInsertInternalPartitioner(
- BulkInsertInternalPartitionerFactory.get(
- sortMode, isTablePartitioned, enforceNumOutputPartitions),
+ BulkInsertInternalPartitionerFactory.get(config, isTablePartitioned,
enforceNumOutputPartitions),
records1,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
- generateExpectedPartitionNumRecords(records1));
+ generateExpectedPartitionNumRecords(records1),
+ populateMetaFields);
testBulkInsertInternalPartitioner(
- BulkInsertInternalPartitionerFactory.get(
- sortMode, isTablePartitioned, enforceNumOutputPartitions),
+ BulkInsertInternalPartitionerFactory.get(config, isTablePartitioned,
enforceNumOutputPartitions),
records2,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
- generateExpectedPartitionNumRecords(records2));
+ generateExpectedPartitionNumRecords(records2),
+ populateMetaFields);
}
@Test
@@ -193,22 +217,21 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 =
generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, false),
- records1, true, true, true,
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
+ records1, true, true, true,
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator),
true);
testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, false),
- records2, true, true, true,
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
+ records2, true, true, true,
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator),
true);
HoodieWriteConfig config = HoodieWriteConfig
- .newBuilder()
- .withPath("/")
- .withSchema(TRIP_EXAMPLE_SCHEMA)
-
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
- .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
- .build();
+ .newBuilder()
+ .withPath("/")
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
+ .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
+ .build();
testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(config),
- records1, true, true, true,
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
+ records1, true, true, true,
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator),
true);
testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(config),
- records2, true, true, true,
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
-
+ records2, true, true, true,
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator),
true);
}
private Comparator<HoodieRecord<? extends HoodieRecordPayload>>
getCustomColumnComparator(Schema schema, String[] sortColumns) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
index de827f7a450..6332e00ba67 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
@@ -44,7 +45,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Unit tests {@link BulkInsertPartitioner}s with Rows.
@@ -53,6 +56,7 @@ public class TestBulkInsertInternalPartitionerForRows extends
HoodieClientTestHa
private static final Comparator<Row> KEY_COMPARATOR =
Comparator.comparing(o ->
(o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" +
o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
+
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestBulkInsertInternalPartitionerForRows");
@@ -71,16 +75,21 @@ public class TestBulkInsertInternalPartitionerForRows
extends HoodieClientTestHa
// boolean isTablePartitioned,
// boolean enforceNumOutputPartitions,
// boolean isGloballySorted,
- // boolean isLocallySorted
+ // boolean isLocallySorted,
+ // boolean populateMetaFields
Object[][] data = new Object[][] {
- {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
- {BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false,
false},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false,
false},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true,
false, false},
- {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true,
false, false},
- {BulkInsertSortMode.NONE, true, true, false, false},
- {BulkInsertSortMode.NONE, true, false, false, false}
+ {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true},
+ {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false,
false, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false,
false, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true,
false, false, true},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true,
false, false, true},
+ {BulkInsertSortMode.NONE, true, true, false, false, true},
+ {BulkInsertSortMode.NONE, true, false, false, false, true},
+ {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false},
+ {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false,
false, false},
+ {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true,
false, false, false, false},
};
return Stream.of(data).map(Arguments::of);
}
@@ -91,28 +100,37 @@ public class TestBulkInsertInternalPartitionerForRows
extends HoodieClientTestHa
boolean isTablePartitioned,
boolean
enforceNumOutputPartitions,
boolean isGloballySorted,
- boolean isLocallySorted)
- throws Exception {
+ boolean isLocallySorted,
+ boolean populateMetaFields) {
Dataset<Row> records1 = generateTestRecords();
Dataset<Row> records2 = generateTestRecords();
+
+ HoodieWriteConfig config = HoodieWriteConfig
+ .newBuilder()
+ .withPath("/")
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withBulkInsertSortMode(sortMode.name())
+ .withPopulateMetaFields(populateMetaFields)
+ .build();
+
testBulkInsertInternalPartitioner(
- BulkInsertInternalPartitionerWithRowsFactory.get(
- sortMode, isTablePartitioned, enforceNumOutputPartitions),
+ BulkInsertInternalPartitionerWithRowsFactory.get(config,
isTablePartitioned, enforceNumOutputPartitions),
records1,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records1),
- Option.empty());
+ Option.empty(),
+ populateMetaFields);
testBulkInsertInternalPartitioner(
- BulkInsertInternalPartitionerWithRowsFactory.get(
- sortMode, isTablePartitioned, enforceNumOutputPartitions),
+ BulkInsertInternalPartitionerWithRowsFactory.get(config,
isTablePartitioned, enforceNumOutputPartitions),
records2,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records2),
- Option.empty());
+ Option.empty(),
+ populateMetaFields);
}
@Test
@@ -124,9 +142,9 @@ public class TestBulkInsertInternalPartitionerForRows
extends HoodieClientTestHa
Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
testBulkInsertInternalPartitioner(new
RowCustomColumnsSortPartitioner(sortColumns),
- records1, true, false, true,
generateExpectedPartitionNumRecords(records1), Option.of(comparator));
+ records1, true, false, true,
generateExpectedPartitionNumRecords(records1), Option.of(comparator), true);
testBulkInsertInternalPartitioner(new
RowCustomColumnsSortPartitioner(sortColumns),
- records2, true, false, true,
generateExpectedPartitionNumRecords(records2), Option.of(comparator));
+ records2, true, false, true,
generateExpectedPartitionNumRecords(records2), Option.of(comparator), true);
HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
@@ -135,9 +153,9 @@ public class TestBulkInsertInternalPartitionerForRows
extends HoodieClientTestHa
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
.build();
testBulkInsertInternalPartitioner(new
RowCustomColumnsSortPartitioner(config),
- records1, true, false, true,
generateExpectedPartitionNumRecords(records1), Option.of(comparator));
+ records1, true, false, true,
generateExpectedPartitionNumRecords(records1), Option.of(comparator), true);
testBulkInsertInternalPartitioner(new
RowCustomColumnsSortPartitioner(config),
- records2, true, false, true,
generateExpectedPartitionNumRecords(records2), Option.of(comparator));
+ records2, true, false, true,
generateExpectedPartitionNumRecords(records2), Option.of(comparator), true);
}
private void testBulkInsertInternalPartitioner(BulkInsertPartitioner
partitioner,
@@ -146,8 +164,13 @@ public class TestBulkInsertInternalPartitionerForRows
extends HoodieClientTestHa
boolean isGloballySorted,
boolean isLocallySorted,
Map<String, Long>
expectedPartitionNumRecords,
- Option<Comparator<Row>>
comparator) {
+ Option<Comparator<Row>>
comparator,
+ boolean populateMetaFields) {
int numPartitions = 2;
+ if (!populateMetaFields) {
+ assertThrows(HoodieException.class, () ->
partitioner.repartitionRecords(rows, numPartitions));
+ return;
+ }
Dataset<Row> actualRecords = (Dataset<Row>)
partitioner.repartitionRecords(rows, numPartitions);
assertEquals(
enforceNumOutputPartitions ? numPartitions :
rows.rdd().getNumPartitions(),
@@ -198,7 +221,7 @@ public class TestBulkInsertInternalPartitionerForRows
extends HoodieClientTestHa
private void verifyRowsAscendingOrder(List<Row> records,
Option<Comparator<Row>> comparator) {
List<Row> expectedRecords = new ArrayList<>(records);
- Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR));
+ Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR));
assertEquals(expectedRecords, records);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f0ede2b2b82..06f85fca022 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -735,8 +735,7 @@ object HoodieSparkSqlWriter {
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
} else {
- BulkInsertInternalPartitionerWithRowsFactory.get(
- writeConfig.getBulkInsertSortMode, isTablePartitioned)
+ BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig,
isTablePartitioned)
}
} else {
// Sort modes are not yet supported when meta fields are disabled
@@ -981,7 +980,7 @@ object HoodieSparkSqlWriter {
}
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
- tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String],
HoodieConfig) = {
+ tableConfig: HoodieTableConfig,
mode: SaveMode): (Map[String, String], HoodieConfig) = {
val translatedOptions =
DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)