hudi-agent commented on code in PR #18990:
URL: https://github.com/apache/hudi/pull/18990#discussion_r3408842691
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -146,6 +188,84 @@ object HoodieDatasetBulkInsertHelper
partitioner.repartitionRecords(updatedDF, targetParallelism)
}
+ /**
+ * Builds the record-key / partition-path projection columns for the non
auto-keygen path,
+ * dispatching to a fast-path tier when the keygen is amenable to a pure
Catalyst projection.
+ *
+ * Tier 1: NonpartitionedKeyGenerator with a single record-key field.
+ * Tier 2: SimpleKeyGenerator with a single record-key + single
partition-path field, when the
+ * partition formatter flags are reproducible as Catalyst
expressions. Currently:
+ * default flags, or hive-style partitioning. URL-encoded and
slash-separated date
+ * partitioning fall through to Tier 3 -- the URL escape table
doesn't have an efficient
+ * pure-Catalyst equivalent, and slash-separated dates were added in
1.2.0 and exercise
+ * a separate formatter branch we'd rather not encode twice.
+ * Tier 3: Anonymous UDF that calls into the supplied
[[BuiltinKeyGenerator]].
+ */
+ private def buildKeygenColumns(df: DataFrame,
+ keyGeneratorClassName: String,
+ typedProps: TypedProperties,
+ keyGenerator: BuiltinKeyGenerator):
(org.apache.spark.sql.Column, org.apache.spark.sql.Column) = {
+ val recordKeyFields = keyGenerator.getRecordKeyFieldNames
+ val partitionPathFields = keyGenerator.getPartitionPathFields
+
+ val hiveStyle =
typedProps.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key,
+
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue.toBoolean)
+ val urlEncode =
typedProps.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key,
+ KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue.toBoolean)
+ val slashSep =
typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key,
+
KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean)
+
+ val tier1 = keyGeneratorClassName ==
classOf[NonpartitionedKeyGenerator].getName &&
+ recordKeyFields.size == 1
+
+ val tier2 = keyGeneratorClassName == classOf[SimpleKeyGenerator].getName &&
+ recordKeyFields.size == 1 && partitionPathFields.size == 1 &&
+ !urlEncode && !slashSep
Review Comment:
🤖 The Tier 2 gate checks the keygen class and formatter flags but not the
partition column's type. For a `Timestamp` partition column,
`col(field).cast(StringType)` formats differently than the canonical
`Timestamp.toString()` that `combinePartitionPath` uses (e.g. Spark's cast
drops trailing zeros and uses session timezone, `Timestamp.toString` keeps
fractional seconds and uses JVM default tz). Same concern applies to Tier 1 /
the `recordKeyCol` cast. The parity test only exercises a `String` partition
column so this divergence wouldn't be caught — could you confirm the
type-mismatch case is intentional (matches the pre-#5470 fast path) and maybe
add a `StringType` guard or test coverage for a Timestamp partition?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -94,44 +96,84 @@ object HoodieDatasetBulkInsertHelper
val keyGeneratorClassName =
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
"Key-generator class name is required")
- val prependedRdd: RDD[InternalRow] = {
- injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
- val typedProps = TypedProperties.copy(config.getProps)
- if (autoGenerateRecordKeys) {
+ if (autoGenerateRecordKeys) {
+ // Auto-keygen needs per-task partitionId from TaskContext and is
stateful (rowId++),
+ // so it can't be expressed as a driver-side UDF closure. Keep the RDD
path here.
+ // TODO: a custom Catalyst Expression could let this path avoid the
toRdd round-trip too.
+ val prependedRdd: RDD[InternalRow] = {
+ injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
+ val typedProps = TypedProperties.copy(config.getProps)
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()))
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime)
- }
- val sparkKeyGenerator =
-
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
typedProps)
- .asInstanceOf[BuiltinKeyGenerator]
- val keyGenerator: BuiltinKeyGenerator = if
(autoGenerateRecordKeys) {
- new AutoRecordGenWrapperKeyGenerator(typedProps,
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
- } else {
- sparkKeyGenerator
- }
-
- iter.map { row =>
- // auto generate record keys if needed
- val metaFields = new Array[UTF8String](5)
- metaFields(2) = keyGenerator.getRecordKey(row, schema)
- metaFields(3) = keyGenerator.getPartitionPath(row, schema)
- metaFields(0) = UTF8String.EMPTY_UTF8
- metaFields(1) = UTF8String.EMPTY_UTF8
- metaFields(4) = UTF8String.EMPTY_UTF8
-
- // TODO use mutable row, avoid re-allocating
- sparkAdapter.createInternalRow(metaFields, row, false)
- }
- }, SQLConf.get)
- }
+ val sparkKeyGenerator =
+
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
typedProps)
+ .asInstanceOf[BuiltinKeyGenerator]
+ val keyGenerator: BuiltinKeyGenerator =
+ new AutoRecordGenWrapperKeyGenerator(typedProps,
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
+
+ iter.map { row =>
+ val metaFields = new Array[UTF8String](5)
+ metaFields(2) = keyGenerator.getRecordKey(row, schema)
+ metaFields(3) = keyGenerator.getPartitionPath(row, schema)
+ metaFields(0) = UTF8String.EMPTY_UTF8
+ metaFields(1) = UTF8String.EMPTY_UTF8
+ metaFields(4) = UTF8String.EMPTY_UTF8
+ sparkAdapter.createInternalRow(metaFields, row, false)
+ }
+ }, SQLConf.get)
+ }
+
+ val dedupedRdd = if (config.shouldCombineBeforeInsert) {
+ dedupeRows(prependedRdd, updatedSchema,
tableConfig.getOrderingFields.asScala.toList,
SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
+ } else {
+ prependedRdd
+ }
- val dedupedRdd = if (config.shouldCombineBeforeInsert) {
- dedupeRows(prependedRdd, updatedSchema,
tableConfig.getOrderingFields.asScala.toList,
SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
+ sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession,
dedupedRdd, updatedSchema)
} else {
- prependedRdd
- }
+ // Non auto-keygen path. Compute meta columns via Catalyst expressions
where possible:
+ // - Tier 1 (Nonpartitioned, single record-key field):
col(rk).cast(String) + lit("")
+ // - Tier 2 (Simple, default partition formatter flags):
col(rk).cast(String) + col(pp).cast(String)
+ // - Tier 3 (everything else): anonymous UDF invoking
BuiltinKeyGenerator on Row
+ //
+ // Tier 1/2 are pure column projections, so Catalyst codegens them and
we avoid the
+ // toRdd.mapPartitions round-trip. Tier 3 keeps a UDF over the
DataFrame so the projection
+ // remains in Catalyst; the UDF is anonymous (not registered on the
SparkSession) so
+ // nothing leaks across writes.
+ //
+ // Tier 1/2 do not reproduce the canonical keygen's null/empty
record-key validation:
+ // a null record-key value passes through as SQL NULL instead of
throwing HoodieKeyException.
+ // This matches the behaviour of the pre-existing fast paths in this
helper prior to the
+ // RDD-based rewrite. Tier 3 retains the strict canonical behaviour.
Review Comment:
🤖 The comment notes that Tier 1/2 pass NULL record-keys through instead of
throwing, but the divergence also covers empty-string record keys —
`SimpleKeyGenerator.getRecordKey(Row)` calls `requireNonNullNonEmptyKey` which
throws on both null and empty. Worth mentioning explicitly so future readers
don't assume empty strings still validate.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -146,6 +188,84 @@ object HoodieDatasetBulkInsertHelper
partitioner.repartitionRecords(updatedDF, targetParallelism)
}
+ /**
+ * Builds the record-key / partition-path projection columns for the non
auto-keygen path,
+ * dispatching to a fast-path tier when the keygen is amenable to a pure
Catalyst projection.
+ *
+ * Tier 1: NonpartitionedKeyGenerator with a single record-key field.
+ * Tier 2: SimpleKeyGenerator with a single record-key + single
partition-path field, when the
+ * partition formatter flags are reproducible as Catalyst
expressions. Currently:
+ * default flags, or hive-style partitioning. URL-encoded and
slash-separated date
+ * partitioning fall through to Tier 3 -- the URL escape table
doesn't have an efficient
+ * pure-Catalyst equivalent, and slash-separated dates were added in
1.2.0 and exercise
+ * a separate formatter branch we'd rather not encode twice.
+ * Tier 3: Anonymous UDF that calls into the supplied
[[BuiltinKeyGenerator]].
+ */
+ private def buildKeygenColumns(df: DataFrame,
+ keyGeneratorClassName: String,
+ typedProps: TypedProperties,
+ keyGenerator: BuiltinKeyGenerator):
(org.apache.spark.sql.Column, org.apache.spark.sql.Column) = {
+ val recordKeyFields = keyGenerator.getRecordKeyFieldNames
+ val partitionPathFields = keyGenerator.getPartitionPathFields
+
+ val hiveStyle =
typedProps.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key,
+
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue.toBoolean)
+ val urlEncode =
typedProps.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key,
+ KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue.toBoolean)
+ val slashSep =
typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key,
+
KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean)
+
+ val tier1 = keyGeneratorClassName ==
classOf[NonpartitionedKeyGenerator].getName &&
Review Comment:
🤖 The class-name comparison uses the user-supplied `keyGeneratorClassName`
directly, so users who configured the Avro variant (`SimpleAvroKeyGenerator` /
`NonpartitionedAvroKeyGenerator`) won't hit Tier 1/2 even though the factory
converts them to the Spark variant at load time. Could compare against
`HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName)`
here. Not a correctness issue — Tier 3 still works for those — just a missed
optimization for that config path.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java:
##########
@@ -382,6 +390,274 @@ public void testBulkInsertParallelismParam() {
sqlContext.sparkContext().removeSparkListener(stageCheckBulkParallelismListener);
}
+ private static Stream<Arguments> provideKeyGenParityArgs() {
+ // Covers every keygen + partition-formatter combo the dispatcher in
+ // HoodieDatasetBulkInsertHelper#buildKeygenColumns can land on.
SimpleKeyGenerator cases
+ // exercise hive-style / url-encode / slash-separated date flags. Tier 2
covers only the
+ // {default, hive-style} subset for SimpleKeyGen; url-encode and
slash-separated push to Tier 3.
+ return Stream.of(
+ // Tier 1
+ Arguments.of("nonpartitioned-tier1",
NonpartitionedKeyGenerator.class.getName(), "_row_key", "", false, false,
false),
+ // Tier 2
+ Arguments.of("simple-default-tier2",
SimpleKeyGenerator.class.getName(), "_row_key", "partition", false, false,
false),
+ Arguments.of("simple-hive-tier2", SimpleKeyGenerator.class.getName(),
"_row_key", "partition", true, false, false),
+ // Tier 3 fallbacks: Simple under url-encode / slash-sep, plus every
other keygen class.
+ Arguments.of("simple-slash-tier3", SimpleKeyGenerator.class.getName(),
"_row_key", "partition", false, true, false),
+ Arguments.of("simple-hive-slash-tier3",
SimpleKeyGenerator.class.getName(), "_row_key", "partition", true, true, false),
+ Arguments.of("simple-url-tier3", SimpleKeyGenerator.class.getName(),
"_row_key", "partition", false, false, true),
+ Arguments.of("simple-hive-url-tier3",
SimpleKeyGenerator.class.getName(), "_row_key", "partition", true, false, true),
+ Arguments.of("complex-single-tier3",
ComplexKeyGenerator.class.getName(), "_row_key", "partition", false, false,
false),
+ Arguments.of("complex-multi-tier3",
ComplexKeyGenerator.class.getName(), "_row_key,ts",
"partition,_hoodie_is_deleted", false, false, false),
+ Arguments.of("timestamp-based-tier3",
TimestampBasedKeyGenerator.class.getName(), "_row_key", "ts", false, false,
false),
+ Arguments.of("custom-tier3", CustomKeyGenerator.class.getName(),
"_row_key", "partition:SIMPLE", false, false, false));
+ }
+
+ /**
+ * Asserts that record-key / partition-path values produced by {@link
HoodieDatasetBulkInsertHelper}
+ * match those produced by the canonical Avro path ({@link
BuiltinKeyGenerator#getRecordKey(GenericRecord)}).
+ *
+ * <p>The Avro path is the ground truth shared by read- and write-side
keygen invocations, so parity
+ * against it (rather than RDD-vs-UDF parity) is what actually matters for
correctness.
+ */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("provideKeyGenParityArgs")
+ public void testKeyGenParityAgainstAvroGroundTruth(String label,
+ String keyGenClass,
+ String recordKeyFields,
+ String
partitionPathFields,
+ boolean
hiveStylePartitioning,
+ boolean
slashSepPartitioning,
+ boolean
urlEncodePartitioning) {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
keyGenClass);
+ props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKeyFields);
+ props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
partitionPathFields);
+ props.put(HoodieWriteConfig.TBL_NAME.key(), label + "_parity_table");
+ props.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(),
String.valueOf(hiveStylePartitioning));
+ props.put(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key(),
String.valueOf(slashSepPartitioning));
+ props.put(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key(),
String.valueOf(urlEncodePartitioning));
+ if (keyGenClass.equals(TimestampBasedKeyGenerator.class.getName())) {
+ props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key(),
"EPOCHMILLISECONDS");
+
props.put(TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT.key(),
"yyyy-MM-dd");
+ props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT.key(),
"UTC");
+ }
+ HoodieWriteConfig config =
getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
+
+ List<Row> rows = DataSourceTestUtils.generateRandomRows(20);
+ Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
+ Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+ new HoodieTableConfig(), new NonSortPartitionerWithRows(),
"000000001");
+
+ int recordKeyIdx =
result.schema().fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ int partitionPathIdx =
result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+ int rowKeyIdx = result.schema().fieldIndex("_row_key");
+
+ Map<String, String[]> actual = new HashMap<>();
+ for (Row r : result.collectAsList()) {
+ actual.put(r.getString(rowKeyIdx),
+ new String[] {r.getString(recordKeyIdx),
r.getString(partitionPathIdx)});
+ }
+
+ TypedProperties keyGenProps = new TypedProperties();
+ keyGenProps.putAll(props);
+ BuiltinKeyGenerator groundTruthKeyGen =
+ (BuiltinKeyGenerator)
org.apache.hudi.common.util.ReflectionUtils.loadClass(keyGenClass, keyGenProps);
+ scala.Function1<Row, GenericRecord> toAvro =
AvroConversionUtils.createConverterToAvro(
+ structType, "trip", "example.schema");
+
+ assertEquals(rows.size(), actual.size(), "Row count mismatch — possible
duplicate record keys");
+ for (Row inputRow : rows) {
+ GenericRecord avro = toAvro.apply(inputRow);
+ String expectedRecordKey = groundTruthKeyGen.getRecordKey(avro);
+ String expectedPartitionPath = groundTruthKeyGen.getPartitionPath(avro);
+ String[] observed = actual.get(inputRow.getString(0));
+ assertEquals(expectedRecordKey, observed[0],
+ "record key mismatch for keygen=" + label + " row=" +
inputRow.getString(0));
+ assertEquals(expectedPartitionPath, observed[1],
+ "partition path mismatch for keygen=" + label + " row=" +
inputRow.getString(0));
+ }
+ }
+
+ /**
+ * Tier 1 / Tier 2 fast paths use {@code col(field).cast(String)} for the
record key. Confirms that
+ * a non-string record-key column (e.g. {@code ts: long}) is materialised as
the string form of the
+ * underlying value, matching the canonical keygen output.
+ */
+ @Test
+ public void testFastPathCastsNonStringRecordKey() {
+ // Tier 1: Nonpartitioned, single record-key field. Using `ts` (long) as
the record key.
+ Map<String, String> nonpartitionedProps = new HashMap<>();
+
nonpartitionedProps.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
NonpartitionedKeyGenerator.class.getName());
+ nonpartitionedProps.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"ts");
+
nonpartitionedProps.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "");
+ nonpartitionedProps.put(HoodieWriteConfig.TBL_NAME.key(),
"nonpartitioned_cast_tbl");
+ assertFastPathRecordKeyCast(nonpartitionedProps, /* expectedPartitionPath
*/ "");
+
+ // Tier 2: Simple, single record-key + partition-path. Using `ts` (long)
as the record key.
+ Map<String, String> simpleProps = new HashMap<>();
+ simpleProps.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
SimpleKeyGenerator.class.getName());
+ simpleProps.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "ts");
+ simpleProps.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition");
+ simpleProps.put(HoodieWriteConfig.TBL_NAME.key(), "simple_cast_tbl");
+ assertFastPathRecordKeyCast(simpleProps, /* expectedPartitionPath */ null);
+ }
+
+ private void assertFastPathRecordKeyCast(Map<String, String> props, String
expectedPartitionPath) {
+ HoodieWriteConfig config =
getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
+ List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
+ Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
+ Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+ new HoodieTableConfig(), new NonSortPartitionerWithRows(),
"000000001");
+
+ int recordKeyIdx =
result.schema().fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ int partitionPathIdx =
result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+ int tsIdx = result.schema().fieldIndex("ts");
+ int partitionIdx = result.schema().fieldIndex("partition");
+
+ for (Row r : result.collectAsList()) {
+ Object tsVal = r.get(tsIdx);
+ assertEquals(String.valueOf(tsVal), r.getString(recordKeyIdx),
+ "record key should be string form of ts column");
+ String expected = expectedPartitionPath != null ? expectedPartitionPath
: String.valueOf(r.get(partitionIdx));
+ assertEquals(expected, r.getString(partitionPathIdx),
+ "partition path mismatch");
+ }
+ }
+
+ /**
+ * Sanity check that {@link HoodieDatasetBulkInsertHelper} composes the
Catalyst plan from
+ * pure column projections for Tier 1 and Tier 2 paths — no UDFs, no toRdd
round-trip — so the
+ * fast paths actually benefit from Catalyst codegen. We verify this by
inspecting the resulting
+ * Dataset's logical plan for absence of {@code ScalaUDF}/{@code
UserDefinedFunction} nodes.
+ */
+ @Test
+ public void testFastPathAvoidsUdf() {
+ // Tier 1
+ Map<String, String> tier1Props = new HashMap<>();
+ tier1Props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
NonpartitionedKeyGenerator.class.getName());
+ tier1Props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
+ tier1Props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "");
+ tier1Props.put(HoodieWriteConfig.TBL_NAME.key(), "tier1_plan_tbl");
+ assertNoScalaUdfInPlan(tier1Props, "tier1");
+
+ // Tier 2
+ Map<String, String> tier2Props = new HashMap<>();
+ tier2Props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
SimpleKeyGenerator.class.getName());
+ tier2Props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
+ tier2Props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition");
+ tier2Props.put(HoodieWriteConfig.TBL_NAME.key(), "tier2_plan_tbl");
+ assertNoScalaUdfInPlan(tier2Props, "tier2");
+ }
+
+ private void assertNoScalaUdfInPlan(Map<String, String> props, String label)
{
+ HoodieWriteConfig config =
getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
+ List<Row> rows = DataSourceTestUtils.generateRandomRows(5);
+ Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
+ Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+ new HoodieTableConfig(), new NonSortPartitionerWithRows(),
"000000001");
+ String plan = result.queryExecution().analyzed().toString();
+ assertTrue(!plan.toLowerCase().contains("scalaudf"),
Review Comment:
🤖 nit: `assertTrue(!condition)` reads a bit awkwardly here —
`assertFalse(plan.toLowerCase().contains("scalaudf"), ...)` is more idiomatic
and produces a slightly clearer failure message.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]