nsivabalan opened a new issue, #18989:
URL: https://github.com/apache/hudi/issues/18989
### Problem
`HoodieDatasetBulkInsertHelper.prepareForBulkInsert` invokes the configured
`KeyGenerator` for every row via `df.queryExecution.toRdd.mapPartitions { iter
=> iter.map { row => keyGenerator.getRecordKey(row, schema);
keyGenerator.getPartitionPath(row, schema); ... } }`. This forces an RDD
round-trip and per-row reflection-based keygen invocation on every record, even
for the keygens (NonpartitionedKeyGenerator, SimpleKeyGenerator) where the
record-key and partition-path values can be sourced directly from input columns.
Before the original row-writer PR
([#5470](https://github.com/apache/hudi/pull/5470)) consolidated the path,
these two keygens were handled with `withColumn(col(field).cast(String))`
projections — pure Catalyst, free codegen, no row materialization. Collapsing
the dispatch onto an RDD `mapPartitions` regressed bulk-insert throughput for
the common SimpleKeyGen and NonpartitionedKeyGen cases. Related: #18969 (same
module, drop-partition-columns hot path).
### Proposed fix
Restore tiered dispatch in `prepareForBulkInsert`:
- **Tier 1 — `NonpartitionedKeyGenerator`** (single record-key field): emit
`col(rk).cast(String)` + `lit("")` as Catalyst columns.
- **Tier 2 — `SimpleKeyGenerator`** (single record-key + single
partition-path, URL-encoding off, slash-separated dates off): emit
`col(rk).cast(String)` and a partition-path expression mirroring
`PartitionPathFormatterBase#combine`, including the `handleEmpty ->
__HIVE_DEFAULT_PARTITION__` substitution and hive-style `field=` prefixing.
- **Tier 3 — everything else** (multi-field keys, ComplexKeyGenerator,
TimestampBased, Custom, Simple with URL-encode or slash-sep): anonymous
`functions.udf` over a struct of input columns calling the canonical
`BuiltinKeyGenerator.getRecordKey(Row)` / `getPartitionPath(Row)` — i.e. the
same Avro-aligned formatter the read side and Tier 3 callers already share.
UDFs are not registered against the SparkSession (no leak across writes).
- **Auto record key generation** keeps the existing RDD path; it needs
`TaskContext.partitionId` and a stateful per-task counter, which can't be
expressed cleanly as a driver-side closure.
The Tier 3 UDF goes through `getRecordKey(Row)` / `getPartitionPath(Row)`,
which use the `String` formatter (`combinePartitionPath`), so all three
formatter flags (hive-style, URL encode, slash-separated dates) are honored for
the keygens that fall through. The Tier 2 fast-path encodes only the
default+hive-style flag combinations (URL encoding has no efficient
pure-Catalyst equivalent, and the 1.2.0+ slash-separated branch in the
formatter exercises a separate code path we'd rather not encode twice).
### Test coverage
New tests added to `TestHoodieDatasetBulkInsertHelper`:
- **Parity against canonical Avro keygen**, parameterized across 11 cases —
every supported keygen class plus the SimpleKeyGen flag combinations: default,
hive-style only, slash-sep (Tier 3), hive+slash (Tier 3), URL-encode (Tier 3),
hive+URL (Tier 3), ComplexKeyGen single/multi, TimestampBased, Custom.
- **Non-string record key cast** — verifies Tier 1/2 cast `ts: long`
correctly.
- **Logical plan inspection** — Tier 1/2 plans contain no `ScalaUDF` (i.e.
they actually benefit from Catalyst codegen, not silently fall to Tier 3).
- **Empty partition value** — confirms `__HIVE_DEFAULT_PARTITION__`
substitution under both default and hive-style flags.
- **Driver session timezone propagation** — Tier 3 UDF respects
`spark.sql.session.timeZone` set on the driver (guards against executor JVM
default leakage on TimestampBasedKeyGenerator).
### Out of scope (followups)
- The `dropPartitionColumns = true` path inside
`BulkInsertDataInternalWriterHelper.write` does the partition-column strip
per-row in a Java loop with a `HashSet` lookup. Lifting that into
`prepareForBulkInsert` would let us also avoid materialising columns that are
about to be dropped — see #18969 for the adjacent discussion.
- The auto-keygen path could shed its `toRdd` round-trip with a custom
Catalyst `Expression` for `partitionId + monotonic rowId`. Out of scope here.
### PR
Patch will be linked here once opened.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]