nsivabalan opened a new issue, #18989:
URL: https://github.com/apache/hudi/issues/18989

   ### Problem
   
   `HoodieDatasetBulkInsertHelper.prepareForBulkInsert` invokes the configured 
`KeyGenerator` for every row via `df.queryExecution.toRdd.mapPartitions { iter 
=> iter.map { row => keyGenerator.getRecordKey(row, schema); 
keyGenerator.getPartitionPath(row, schema); ... } }`. This forces an RDD 
round-trip and per-row reflection-based keygen invocation on every record, even 
for the keygens (NonpartitionedKeyGenerator, SimpleKeyGenerator) where the 
record-key and partition-path values can be sourced directly from input columns.
   
   Before the original row-writer PR 
([#5470](https://github.com/apache/hudi/pull/5470)) consolidated the path, 
these two keygens were handled with `withColumn(col(field).cast(String))` 
projections — pure Catalyst, free codegen, no row materialization. Collapsing 
the dispatch onto an RDD `mapPartitions` regressed bulk-insert throughput for 
the common SimpleKeyGen and NonpartitionedKeyGen cases. Related: #18969 (same 
module, drop-partition-columns hot path).
   
   ### Proposed fix
   
   Restore tiered dispatch in `prepareForBulkInsert`:
   
   - **Tier 1 — `NonpartitionedKeyGenerator`** (single record-key field): emit 
`col(rk).cast(String)` + `lit("")` as Catalyst columns.
   - **Tier 2 — `SimpleKeyGenerator`** (single record-key + single 
partition-path, URL-encoding off, slash-separated dates off): emit 
`col(rk).cast(String)` and a partition-path expression mirroring 
`PartitionPathFormatterBase#combine`, including the `handleEmpty -> 
__HIVE_DEFAULT_PARTITION__` substitution and hive-style `field=` prefixing.
   - **Tier 3 — everything else** (multi-field keys, ComplexKeyGenerator, 
TimestampBased, Custom, Simple with URL-encode or slash-sep): anonymous 
`functions.udf` over a struct of input columns calling the canonical 
`BuiltinKeyGenerator.getRecordKey(Row)` / `getPartitionPath(Row)` — i.e. the 
same Avro-aligned formatter the read side and Tier 3 callers already share. 
UDFs are not registered against the SparkSession (no leak across writes).
   - **Auto record key generation** keeps the existing RDD path; it needs 
`TaskContext.partitionId` and a stateful per-task counter, which can't be 
expressed cleanly as a driver-side closure.
   
   The Tier 3 UDF goes through `getRecordKey(Row)` / `getPartitionPath(Row)`, 
which use the `String` formatter (`combinePartitionPath`), so all three 
formatter flags (hive-style, URL encode, slash-separated dates) are honored for 
the keygens that fall through. The Tier 2 fast-path encodes only the 
default+hive-style flag combinations (URL encoding has no efficient 
pure-Catalyst equivalent, and the 1.2.0+ slash-separated branch in the 
formatter exercises a separate code path we'd rather not encode twice).
   
   ### Test coverage
   
   New tests added to `TestHoodieDatasetBulkInsertHelper`:
   
   - **Parity against canonical Avro keygen**, parameterized across 11 cases — 
every supported keygen class plus the SimpleKeyGen flag combinations: default, 
hive-style only, slash-sep (Tier 3), hive+slash (Tier 3), URL-encode (Tier 3), 
hive+URL (Tier 3), ComplexKeyGen single/multi, TimestampBased, Custom.
   - **Non-string record key cast** — verifies Tier 1/2 cast `ts: long` 
correctly.
   - **Logical plan inspection** — Tier 1/2 plans contain no `ScalaUDF` (i.e. 
they actually benefit from Catalyst codegen, not silently fall to Tier 3).
   - **Empty partition value** — confirms `__HIVE_DEFAULT_PARTITION__` 
substitution under both default and hive-style flags.
   - **Driver session timezone propagation** — Tier 3 UDF respects 
`spark.sql.session.timeZone` set on the driver (guards against executor JVM 
default leakage on TimestampBasedKeyGenerator).
   
   ### Out of scope (followups)
   
   - The `dropPartitionColumns = true` path inside 
`BulkInsertDataInternalWriterHelper.write` does the partition-column strip 
per-row in a Java loop with a `HashSet` lookup. Lifting that into 
`prepareForBulkInsert` would let us also avoid materialising columns that are 
about to be dropped — see #18969 for the adjacent discussion.
   - The auto-keygen path could shed its `toRdd` round-trip with a custom 
Catalyst `Expression` for `partitionId + monotonic rowId`. Out of scope here.
   
   ### PR
   
   Patch will be linked here once opened.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to