voonhous commented on issue #19035: URL: https://github.com/apache/hudi/issues/19035#issuecomment-4777427402
# Variant shredding benchmark: Hudi vs Spark 4.1 native (preliminary) Preliminary numbers from the standalone harness (`VariantShreddingBenchmark` in `hudi-examples-spark`). Single local run, so treat these as directional, not quotable. ## Setup Code can be found in: https://github.com/voonhous/hudi/blob/57d5a3fc8439c1aef78f9714c4406156ff0688e0/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/VariantShreddingBenchmark.scala Branch: https://github.com/voonhous/hudi/tree/bench-variant-shredding-spark41 - Comparison: Hudi shredded-inference (row-writer `bulk_insert`, COW) vs vanilla Spark 4.1 `df.write.parquet` native shredding, over an identical cached DataFrame. - Both sides infer the `typed_value` schema via Spark's own `InferVariantShreddingSchema` engine (Hudi delegates to it), so the comparison isolates the write/read machinery, not the schema. - Dataset: 5,000,000 rows. Columns: `id` (long, recordkey), `ts` (long, precombine), `v` (VARIANT). `v` is a homogeneous object with 10 typed fields (`f0..f9`, an int/string/decimal mix) plus one nested object, Avro-safe keys, fully shreddable. - Config: fields=10, nesting depth=1, field cardinality=1000, parallelism=8 (8 base files), compression=zstd, warmup=1, iters=3, metadata table (MDT) disabled, single commit. - Environment: Spark 4.1.1, JDK 17, `local[*]`, Apple Silicon. Numbers are noisy at this scale / iteration count. ## Correctness gate (schema parity) Both sides emit `v (VARIANT(1))` with an identical `typed_value` structure. The harness diffs the two parquet variant schemas and only trusts the perf numbers when they match. - Both fully shredded: typed fraction = 1.000 (residual `value`/`metadata` is ~0.02 MB). - Schema diff: 3 differences, all the same kind. The decimal fields `f2`/`f5`/`f8` are `fixed_len_byte_array(8)` on Hudi vs `int64` on native (both `DECIMAL(18,1)`). ## Results ### Write throughput | case | best (ms) | avg (ms) | stdev | rows/s | |---------------------------|-----------|----------|-------|--------| | Hudi (shredded inference) | 2556 | 2661 | 89 | 1.96M | | Spark native | 1546 | 1567 | 25 | 3.23M | Hudi is ~1.65x slower on write. ### Read latency | query | Hudi best (ms) | native best (ms) | Hudi / native | |----------------------------------------------|----------------|------------------|-----------------| | full-row reconstruct (`cast(v as string)`) | 4737 | 4749 | ~1.00x (on par) | | single-field projection (`variant_get`) | 748 | 606 | 1.23x | | predicate filter (`variant_get > k`) | 728 | 624 | 1.17x | | plain-column projection (`sum(ts)`, control) | 179 | 110 | 1.63x | The read gap is NOT variant-specific. The `sum(ts)` control -- a plain non-variant column -- has the biggest relative gap of all (1.63x), and the ordering (cheaper query means bigger relative gap) is the signature of a fixed per-row read overhead, not anything about variants. ### Why the projection/filter reads are slower (it is the read path, not shredding) Both engines push the projection into the scan and read only the requested `typed_value` subcolumn (verified: same bytes read, no full-variant reconstruction on the COW path). The difference is HOW: - native `spark.read.parquet`: columnar / vectorized parquet read. - Hudi: the COW projection routes through the `HoodieFileGroupReader`, which reads row-based (vectorization is a known TODO, apache/hudi#17736) and adds a per-row `InternalRow.copy()` plus an `UnsafeProjection`. The query plans confirm this: both scans report the identical pruned `ReadSchema: struct<v:struct<0:int>>` (projection pushed in, same bytes read), but the Hudi `Scan HudiFileGroup` reports `Batched: false` while the native `Scan parquet` reports `Batched: true` -- row-based vs vectorized. The `variant_get(v,'$.f0')` projection on each side (`explain("formatted")`, scan node): ``` # Hudi: spark.read.format("hudi").load(path) +- Project [v#715.0 AS g#710] +- Scan HudiFileGroup Output [1]: [v#715] Batched: false # row-based Location: HoodieFileIndex [.../hudi] ReadSchema: struct<v:struct<0:int>> # native: spark.read.parquet(path) +- Project [v#726.0 AS g#721] +- Scan parquet Output [1]: [v#726] Batched: true # vectorized Location: InMemoryFileIndex [.../native] ReadSchema: struct<v:struct<0:int>> ``` (Above each scan the rest of the plan is identical on both sides: Project, then HashAggregate, Exchange, HashAggregate, AdaptiveSparkPlan.) That row-vs-columnar plus per-row tax is a roughly fixed cost per row, so it dominates cheap queries and vanishes on expensive ones: full reconstruct (heavy per-row rebuild on both sides) is on par; `variant_get` projection/filter is ~1.2x; plain `sum(ts)` is ~1.6x (cheapest query, so the fixed tax is the biggest fraction). Variant shredding itself is not the cost -- pruning and reconstruction work correctly and are competitive; the gap is the general Hudi file-group-reader read path, which affects any narrow projection regardless of column type. ### Storage | measure | Hudi | native | ratio | |---------------------|-----------|-----------|-------| | total data parquet | 172.64 MB | 126.02 MB | 1.37x | | variant column only | 107.61 MB | 97.60 MB | 1.10x | `.hoodie` is 28 KB (MDT off, 1 commit), so external Hudi metadata is negligible here. ## Where the size difference comes from Total gap = 46.6 MB, which splits into: - ~36.6 MB: Hudi's 5 per-row meta columns (`_hoodie_commit_time`, `_hoodie_commit_seqno`, `_hoodie_record_key`, `_hoodie_partition_path`, `_hoodie_file_name`), about 7.7 bytes/row compressed. Native parquet has none of these. - ~10.0 MB: the variant column itself, dominated by the decimal `fixed_len_byte_array(8)` vs `int64` difference (int64 supports delta/dictionary encodings that opaque FLBA does not). This is a Hudi-wide decimal-encoding trait, not variant-specific. So the honest variant-encoding comparison is the v-column-only ratio (1.10x); the jump to 1.37x total is the per-row meta-column tax, not the metadata table or the timeline. ## Takeaways (directional) - Hudi's shredded variant is structurally identical to Spark-native (same `typed_value` layout, same `VARIANT(1)` annotation), fully shredded, and reconstruct-reads are on par. - Hudi pays a managed-table cost: ~1.65x write time and ~10 percent larger variant column, plus the fixed per-row meta-column tax on total file size. - The projection/filter read gap is a general Hudi read-path characteristic (row-based file-group reader vs native vectorized parquet), confirmed by the plain-column control -- not a variant cost. Vectorized file-group reads (apache/hudi#17736) are the lever there. - The decimal `fixed_len_byte_array` vs `int64` encoding is the remaining variant-column-size lever. ## Caveats / next steps - MDT off and a single commit. A production table with MDT enabled and many commits would carry additional `.hoodie` overhead that this run excludes. - Single local run, 5M rows, 3 iters: directional only. For quotable numbers: more iterations, larger scale, dedicated hardware. - Next, per the reframed scope: add the shredded-vs-unshredded within-Hudi bar (flip `write.shredding` off). MOR, upsert/MERGE, and data-skipping remain deferred. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
