hudi-agent commented on code in PR #18990:
URL: https://github.com/apache/hudi/pull/18990#discussion_r3408842691


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -146,6 +188,84 @@ object HoodieDatasetBulkInsertHelper
     partitioner.repartitionRecords(updatedDF, targetParallelism)
   }
 
+  /**
+   * Builds the record-key / partition-path projection columns for the non 
auto-keygen path,
+   * dispatching to a fast-path tier when the keygen is amenable to a pure 
Catalyst projection.
+   *
+   * Tier 1: NonpartitionedKeyGenerator with a single record-key field.
+   * Tier 2: SimpleKeyGenerator with a single record-key + single 
partition-path field, when the
+   *         partition formatter flags are reproducible as Catalyst 
expressions. Currently:
+   *         default flags, or hive-style partitioning. URL-encoded and 
slash-separated date
+   *         partitioning fall through to Tier 3 -- the URL escape table 
doesn't have an efficient
+   *         pure-Catalyst equivalent, and slash-separated dates were added in 
1.2.0 and exercise
+   *         a separate formatter branch we'd rather not encode twice.
+   * Tier 3: Anonymous UDF that calls into the supplied 
[[BuiltinKeyGenerator]].
+   */
+  private def buildKeygenColumns(df: DataFrame,
+                                 keyGeneratorClassName: String,
+                                 typedProps: TypedProperties,
+                                 keyGenerator: BuiltinKeyGenerator): 
(org.apache.spark.sql.Column, org.apache.spark.sql.Column) = {
+    val recordKeyFields = keyGenerator.getRecordKeyFieldNames
+    val partitionPathFields = keyGenerator.getPartitionPathFields
+
+    val hiveStyle = 
typedProps.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key,
+      
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue.toBoolean)
+    val urlEncode = 
typedProps.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key,
+      KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue.toBoolean)
+    val slashSep = 
typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key,
+      
KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean)
+
+    val tier1 = keyGeneratorClassName == 
classOf[NonpartitionedKeyGenerator].getName &&
+      recordKeyFields.size == 1
+
+    val tier2 = keyGeneratorClassName == classOf[SimpleKeyGenerator].getName &&
+      recordKeyFields.size == 1 && partitionPathFields.size == 1 &&
+      !urlEncode && !slashSep

Review Comment:
   🤖 The Tier 2 gate checks the keygen class and formatter flags but not the 
partition column's type. For a `Timestamp` partition column, 
`col(field).cast(StringType)` formats differently than the canonical 
`Timestamp.toString()` that `combinePartitionPath` uses (e.g. Spark's cast 
drops trailing zeros and uses session timezone, `Timestamp.toString` keeps 
fractional seconds and uses JVM default tz). Same concern applies to Tier 1 / 
the `recordKeyCol` cast. The parity test only exercises a `String` partition 
column so this divergence wouldn't be caught — could you confirm the 
type-mismatch case is intentional (matches the pre-#5470 fast path) and maybe 
add a `StringType` guard or test coverage for a Timestamp partition?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -94,44 +96,84 @@ object HoodieDatasetBulkInsertHelper
       val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
         "Key-generator class name is required")
 
-      val prependedRdd: RDD[InternalRow] = {
-        injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
-          val typedProps = TypedProperties.copy(config.getProps)
-          if (autoGenerateRecordKeys) {
+      if (autoGenerateRecordKeys) {
+        // Auto-keygen needs per-task partitionId from TaskContext and is 
stateful (rowId++),
+        // so it can't be expressed as a driver-side UDF closure. Keep the RDD 
path here.
+        // TODO: a custom Catalyst Expression could let this path avoid the 
toRdd round-trip too.
+        val prependedRdd: RDD[InternalRow] = {
+          injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
+            val typedProps = TypedProperties.copy(config.getProps)
             
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()))
             
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime)
-          }
-          val sparkKeyGenerator =
-            
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
 typedProps)
-              .asInstanceOf[BuiltinKeyGenerator]
-              val keyGenerator: BuiltinKeyGenerator = if 
(autoGenerateRecordKeys) {
-                new AutoRecordGenWrapperKeyGenerator(typedProps, 
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
-              } else {
-                sparkKeyGenerator
-              }
-
-          iter.map { row =>
-            // auto generate record keys if needed
-            val metaFields = new Array[UTF8String](5)
-            metaFields(2) = keyGenerator.getRecordKey(row, schema)
-            metaFields(3) = keyGenerator.getPartitionPath(row, schema)
-            metaFields(0) = UTF8String.EMPTY_UTF8
-            metaFields(1) = UTF8String.EMPTY_UTF8
-            metaFields(4) = UTF8String.EMPTY_UTF8
-
-            // TODO use mutable row, avoid re-allocating
-            sparkAdapter.createInternalRow(metaFields, row, false)
-          }
-        }, SQLConf.get)
-      }
+            val sparkKeyGenerator =
+              
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
 typedProps)
+                .asInstanceOf[BuiltinKeyGenerator]
+            val keyGenerator: BuiltinKeyGenerator =
+              new AutoRecordGenWrapperKeyGenerator(typedProps, 
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
+
+            iter.map { row =>
+              val metaFields = new Array[UTF8String](5)
+              metaFields(2) = keyGenerator.getRecordKey(row, schema)
+              metaFields(3) = keyGenerator.getPartitionPath(row, schema)
+              metaFields(0) = UTF8String.EMPTY_UTF8
+              metaFields(1) = UTF8String.EMPTY_UTF8
+              metaFields(4) = UTF8String.EMPTY_UTF8
+              sparkAdapter.createInternalRow(metaFields, row, false)
+            }
+          }, SQLConf.get)
+        }
+
+        val dedupedRdd = if (config.shouldCombineBeforeInsert) {
+          dedupeRows(prependedRdd, updatedSchema, 
tableConfig.getOrderingFields.asScala.toList, 
SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
+        } else {
+          prependedRdd
+        }
 
-      val dedupedRdd = if (config.shouldCombineBeforeInsert) {
-        dedupeRows(prependedRdd, updatedSchema, 
tableConfig.getOrderingFields.asScala.toList, 
SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
+        sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, 
dedupedRdd, updatedSchema)
       } else {
-        prependedRdd
-      }
+        // Non auto-keygen path. Compute meta columns via Catalyst expressions 
where possible:
+        //  - Tier 1 (Nonpartitioned, single record-key field): 
col(rk).cast(String) + lit("")
+        //  - Tier 2 (Simple, default partition formatter flags): 
col(rk).cast(String) + col(pp).cast(String)
+        //  - Tier 3 (everything else): anonymous UDF invoking 
BuiltinKeyGenerator on Row
+        //
+        // Tier 1/2 are pure column projections, so Catalyst codegens them and 
we avoid the
+        // toRdd.mapPartitions round-trip. Tier 3 keeps a UDF over the 
DataFrame so the projection
+        // remains in Catalyst; the UDF is anonymous (not registered on the 
SparkSession) so
+        // nothing leaks across writes.
+        //
+        // Tier 1/2 do not reproduce the canonical keygen's null/empty 
record-key validation:
+        // a null record-key value passes through as SQL NULL instead of 
throwing HoodieKeyException.
+        // This matches the behaviour of the pre-existing fast paths in this 
helper prior to the
+        // RDD-based rewrite. Tier 3 retains the strict canonical behaviour.

Review Comment:
   🤖 The comment notes that Tier 1/2 pass NULL record-keys through instead of 
throwing, but the divergence also covers empty-string record keys — 
`SimpleKeyGenerator.getRecordKey(Row)` calls `requireNonNullNonEmptyKey` which 
throws on both null and empty. Worth mentioning explicitly so future readers 
don't assume empty strings still validate.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -146,6 +188,84 @@ object HoodieDatasetBulkInsertHelper
     partitioner.repartitionRecords(updatedDF, targetParallelism)
   }
 
+  /**
+   * Builds the record-key / partition-path projection columns for the non 
auto-keygen path,
+   * dispatching to a fast-path tier when the keygen is amenable to a pure 
Catalyst projection.
+   *
+   * Tier 1: NonpartitionedKeyGenerator with a single record-key field.
+   * Tier 2: SimpleKeyGenerator with a single record-key + single 
partition-path field, when the
+   *         partition formatter flags are reproducible as Catalyst 
expressions. Currently:
+   *         default flags, or hive-style partitioning. URL-encoded and 
slash-separated date
+   *         partitioning fall through to Tier 3 -- the URL escape table 
doesn't have an efficient
+   *         pure-Catalyst equivalent, and slash-separated dates were added in 
1.2.0 and exercise
+   *         a separate formatter branch we'd rather not encode twice.
+   * Tier 3: Anonymous UDF that calls into the supplied 
[[BuiltinKeyGenerator]].
+   */
+  private def buildKeygenColumns(df: DataFrame,
+                                 keyGeneratorClassName: String,
+                                 typedProps: TypedProperties,
+                                 keyGenerator: BuiltinKeyGenerator): 
(org.apache.spark.sql.Column, org.apache.spark.sql.Column) = {
+    val recordKeyFields = keyGenerator.getRecordKeyFieldNames
+    val partitionPathFields = keyGenerator.getPartitionPathFields
+
+    val hiveStyle = 
typedProps.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key,
+      
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue.toBoolean)
+    val urlEncode = 
typedProps.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key,
+      KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue.toBoolean)
+    val slashSep = 
typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key,
+      
KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean)
+
+    val tier1 = keyGeneratorClassName == 
classOf[NonpartitionedKeyGenerator].getName &&

Review Comment:
   🤖 The class-name comparison uses the user-supplied `keyGeneratorClassName` 
directly, so users who configured the Avro variant (`SimpleAvroKeyGenerator` / 
`NonpartitionedAvroKeyGenerator`) won't hit Tier 1/2 even though the factory 
converts them to the Spark variant at load time. Could compare against 
`HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName)`
 here. Not a correctness issue — Tier 3 still works for those — just a missed 
optimization for that config path.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java:
##########
@@ -382,6 +390,274 @@ public void testBulkInsertParallelismParam() {
     
sqlContext.sparkContext().removeSparkListener(stageCheckBulkParallelismListener);
   }
 
+  private static Stream<Arguments> provideKeyGenParityArgs() {
+    // Covers every keygen + partition-formatter combo the dispatcher in
+    // HoodieDatasetBulkInsertHelper#buildKeygenColumns can land on. 
SimpleKeyGenerator cases
+    // exercise hive-style / url-encode / slash-separated date flags. Tier 2 
covers only the
+    // {default, hive-style} subset for SimpleKeyGen; url-encode and 
slash-separated push to Tier 3.
+    return Stream.of(
+        // Tier 1
+        Arguments.of("nonpartitioned-tier1", 
NonpartitionedKeyGenerator.class.getName(), "_row_key", "", false, false, 
false),
+        // Tier 2
+        Arguments.of("simple-default-tier2", 
SimpleKeyGenerator.class.getName(), "_row_key", "partition", false, false, 
false),
+        Arguments.of("simple-hive-tier2", SimpleKeyGenerator.class.getName(), 
"_row_key", "partition", true, false, false),
+        // Tier 3 fallbacks: Simple under url-encode / slash-sep, plus every 
other keygen class.
+        Arguments.of("simple-slash-tier3", SimpleKeyGenerator.class.getName(), 
"_row_key", "partition", false, true, false),
+        Arguments.of("simple-hive-slash-tier3", 
SimpleKeyGenerator.class.getName(), "_row_key", "partition", true, true, false),
+        Arguments.of("simple-url-tier3", SimpleKeyGenerator.class.getName(), 
"_row_key", "partition", false, false, true),
+        Arguments.of("simple-hive-url-tier3", 
SimpleKeyGenerator.class.getName(), "_row_key", "partition", true, false, true),
+        Arguments.of("complex-single-tier3", 
ComplexKeyGenerator.class.getName(), "_row_key", "partition", false, false, 
false),
+        Arguments.of("complex-multi-tier3", 
ComplexKeyGenerator.class.getName(), "_row_key,ts", 
"partition,_hoodie_is_deleted", false, false, false),
+        Arguments.of("timestamp-based-tier3", 
TimestampBasedKeyGenerator.class.getName(), "_row_key", "ts", false, false, 
false),
+        Arguments.of("custom-tier3", CustomKeyGenerator.class.getName(), 
"_row_key", "partition:SIMPLE", false, false, false));
+  }
+
+  /**
+   * Asserts that record-key / partition-path values produced by {@link 
HoodieDatasetBulkInsertHelper}
+   * match those produced by the canonical Avro path ({@link 
BuiltinKeyGenerator#getRecordKey(GenericRecord)}).
+   *
+   * <p>The Avro path is the ground truth shared by read- and write-side 
keygen invocations, so parity
+   * against it (rather than RDD-vs-UDF parity) is what actually matters for 
correctness.
+   */
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("provideKeyGenParityArgs")
+  public void testKeyGenParityAgainstAvroGroundTruth(String label,
+                                                     String keyGenClass,
+                                                     String recordKeyFields,
+                                                     String 
partitionPathFields,
+                                                     boolean 
hiveStylePartitioning,
+                                                     boolean 
slashSepPartitioning,
+                                                     boolean 
urlEncodePartitioning) {
+    Map<String, String> props = new HashMap<>();
+    props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
keyGenClass);
+    props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKeyFields);
+    props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
partitionPathFields);
+    props.put(HoodieWriteConfig.TBL_NAME.key(), label + "_parity_table");
+    props.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), 
String.valueOf(hiveStylePartitioning));
+    props.put(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key(), 
String.valueOf(slashSepPartitioning));
+    props.put(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key(), 
String.valueOf(urlEncodePartitioning));
+    if (keyGenClass.equals(TimestampBasedKeyGenerator.class.getName())) {
+      props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key(), 
"EPOCHMILLISECONDS");
+      
props.put(TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT.key(), 
"yyyy-MM-dd");
+      props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT.key(), 
"UTC");
+    }
+    HoodieWriteConfig config = 
getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
+
+    List<Row> rows = DataSourceTestUtils.generateRandomRows(20);
+    Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
+    Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+        new HoodieTableConfig(), new NonSortPartitionerWithRows(), 
"000000001");
+
+    int recordKeyIdx = 
result.schema().fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    int partitionPathIdx = 
result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    int rowKeyIdx = result.schema().fieldIndex("_row_key");
+
+    Map<String, String[]> actual = new HashMap<>();
+    for (Row r : result.collectAsList()) {
+      actual.put(r.getString(rowKeyIdx),
+          new String[] {r.getString(recordKeyIdx), 
r.getString(partitionPathIdx)});
+    }
+
+    TypedProperties keyGenProps = new TypedProperties();
+    keyGenProps.putAll(props);
+    BuiltinKeyGenerator groundTruthKeyGen =
+        (BuiltinKeyGenerator) 
org.apache.hudi.common.util.ReflectionUtils.loadClass(keyGenClass, keyGenProps);
+    scala.Function1<Row, GenericRecord> toAvro = 
AvroConversionUtils.createConverterToAvro(
+        structType, "trip", "example.schema");
+
+    assertEquals(rows.size(), actual.size(), "Row count mismatch — possible 
duplicate record keys");
+    for (Row inputRow : rows) {
+      GenericRecord avro = toAvro.apply(inputRow);
+      String expectedRecordKey = groundTruthKeyGen.getRecordKey(avro);
+      String expectedPartitionPath = groundTruthKeyGen.getPartitionPath(avro);
+      String[] observed = actual.get(inputRow.getString(0));
+      assertEquals(expectedRecordKey, observed[0],
+          "record key mismatch for keygen=" + label + " row=" + 
inputRow.getString(0));
+      assertEquals(expectedPartitionPath, observed[1],
+          "partition path mismatch for keygen=" + label + " row=" + 
inputRow.getString(0));
+    }
+  }
+
+  /**
+   * Tier 1 / Tier 2 fast paths use {@code col(field).cast(String)} for the 
record key. Confirms that
+   * a non-string record-key column (e.g. {@code ts: long}) is materialised as 
the string form of the
+   * underlying value, matching the canonical keygen output.
+   */
+  @Test
+  public void testFastPathCastsNonStringRecordKey() {
+    // Tier 1: Nonpartitioned, single record-key field. Using `ts` (long) as 
the record key.
+    Map<String, String> nonpartitionedProps = new HashMap<>();
+    
nonpartitionedProps.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
NonpartitionedKeyGenerator.class.getName());
+    nonpartitionedProps.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"ts");
+    
nonpartitionedProps.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "");
+    nonpartitionedProps.put(HoodieWriteConfig.TBL_NAME.key(), 
"nonpartitioned_cast_tbl");
+    assertFastPathRecordKeyCast(nonpartitionedProps, /* expectedPartitionPath 
*/ "");
+
+    // Tier 2: Simple, single record-key + partition-path. Using `ts` (long) 
as the record key.
+    Map<String, String> simpleProps = new HashMap<>();
+    simpleProps.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
SimpleKeyGenerator.class.getName());
+    simpleProps.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "ts");
+    simpleProps.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition");
+    simpleProps.put(HoodieWriteConfig.TBL_NAME.key(), "simple_cast_tbl");
+    assertFastPathRecordKeyCast(simpleProps, /* expectedPartitionPath */ null);
+  }
+
+  private void assertFastPathRecordKeyCast(Map<String, String> props, String 
expectedPartitionPath) {
+    HoodieWriteConfig config = 
getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
+    List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
+    Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
+    Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+        new HoodieTableConfig(), new NonSortPartitionerWithRows(), 
"000000001");
+
+    int recordKeyIdx = 
result.schema().fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    int partitionPathIdx = 
result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    int tsIdx = result.schema().fieldIndex("ts");
+    int partitionIdx = result.schema().fieldIndex("partition");
+
+    for (Row r : result.collectAsList()) {
+      Object tsVal = r.get(tsIdx);
+      assertEquals(String.valueOf(tsVal), r.getString(recordKeyIdx),
+          "record key should be string form of ts column");
+      String expected = expectedPartitionPath != null ? expectedPartitionPath 
: String.valueOf(r.get(partitionIdx));
+      assertEquals(expected, r.getString(partitionPathIdx),
+          "partition path mismatch");
+    }
+  }
+
+  /**
+   * Sanity check that {@link HoodieDatasetBulkInsertHelper} composes the 
Catalyst plan from
+   * pure column projections for Tier 1 and Tier 2 paths — no UDFs, no toRdd 
round-trip — so the
+   * fast paths actually benefit from Catalyst codegen. We verify this by 
inspecting the resulting
+   * Dataset's logical plan for absence of {@code ScalaUDF}/{@code 
UserDefinedFunction} nodes.
+   */
+  @Test
+  public void testFastPathAvoidsUdf() {
+    // Tier 1
+    Map<String, String> tier1Props = new HashMap<>();
+    tier1Props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
NonpartitionedKeyGenerator.class.getName());
+    tier1Props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
+    tier1Props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "");
+    tier1Props.put(HoodieWriteConfig.TBL_NAME.key(), "tier1_plan_tbl");
+    assertNoScalaUdfInPlan(tier1Props, "tier1");
+
+    // Tier 2
+    Map<String, String> tier2Props = new HashMap<>();
+    tier2Props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
SimpleKeyGenerator.class.getName());
+    tier2Props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
+    tier2Props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition");
+    tier2Props.put(HoodieWriteConfig.TBL_NAME.key(), "tier2_plan_tbl");
+    assertNoScalaUdfInPlan(tier2Props, "tier2");
+  }
+
+  private void assertNoScalaUdfInPlan(Map<String, String> props, String label) 
{
+    HoodieWriteConfig config = 
getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
+    List<Row> rows = DataSourceTestUtils.generateRandomRows(5);
+    Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
+    Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+        new HoodieTableConfig(), new NonSortPartitionerWithRows(), 
"000000001");
+    String plan = result.queryExecution().analyzed().toString();
+    assertTrue(!plan.toLowerCase().contains("scalaudf"),

Review Comment:
   🤖 nit: `assertTrue(!condition)` reads a bit awkwardly here — 
`assertFalse(plan.toLowerCase().contains("scalaudf"), ...)` is more idiomatic 
and produces a slightly clearer failure message.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to