bhavya-ganatra opened a new issue, #18980:
URL: https://github.com/apache/hudi/issues/18980

   ### Bug Description
   
   **What happened:**
   When using `hoodie.write.record.merge.mode=CUSTOM` with a custom merger 
class specified via `hoodie.write.record.merge.custom.implementation.classes`, 
inline clustering (and standalone clustering) fails with:
   
   ```
   java.lang.IllegalArgumentException: No valid spark merger implementation set 
for
   `hoodie.write.record.merge.custom.implementation.classes`
   ```
   
   The error is thrown during the clustering execution phase when Hudi attempts 
to read source file groups to produce the clustered output. Every clustering 
task fails on every executor with the same error. The same custom merger class 
works correctly for inline compaction on the same table without any issues. 
Additionally, the merger is successfully used on other COW tables and all MOR 
read paths function correctly with the same implementation. Therefore, the 
issue appears to be specific to the clustering code path rather than the custom 
merger implementation itself.
   
   **What you expected:**
   Clustering should successfully read file groups using the configured custom 
merger class, the same way inline compaction does. Since 
`hoodie.write.record.merge.custom.implementation.classes` is explicitly set in 
the write config, clustering should be able to resolve and instantiate the 
custom merger implementation.
   
   **Steps to reproduce:**
   1. Create a MOR table with `hoodie.write.record.merge.mode=CUSTOM` and a 
custom merger class that extends `HoodieSparkRecordMerger`, e.g.:
      ```
      hoodie.write.record.merge.mode=CUSTOM
      
hoodie.write.record.merge.custom.implementation.classes=com.example.CustomHoodieSparkRecordMerger
      hoodie.write.record.merge.strategy.id=<custom-uuid>
      ```
   2. Write data to the table (upserts) — these succeed.
   3. Enable inline clustering:
      ```
      hoodie.clustering.inline=true
      hoodie.clustering.inline.max.commits=1
      hoodie.clustering.plan.strategy.target.file.max.bytes=10485760 // low 
threshold for easy reproduce
      hoodie.clustering.plan.strategy.small.file.limit=5242880 // low threshold 
for easy reproduce
      hoodie.clustering.plan.strategy.partition.selected="<list of partitions - 
comma seperated>"
   
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
      ```
   4. After the first successful delta commit, clustering is triggered 
automatically. Observe that all clustering tasks fail with 
`IllegalArgumentException: No valid spark merger implementation set`.
   5. Note that inline compaction (`hoodie.compact.inline=true`) on the same 
table with the same configs succeeds without error.
   
   
   
   ## Additional Notes / Investigation Hunch
   
   > **Note:** The following is our working hypothesis based on code inspection 
— not a confirmed root cause. We may be wrong; please treat this as a starting 
point for investigation.
   
   Looking at the stack trace, the failure path is:
   `MultipleSparkJobExecutionStrategy` → 
`ClusteringExecutionStrategy.getFileGroupReader()` → 
`HoodieFileGroupReader.<init>` → `HoodieReaderContext.initRecordMerger()` → 
`BaseSparkInternalRowReaderContext.getRecordMerger()`
   
   The key question is: **why does `initRecordMerger` not find the custom class 
when it is present in the write config?**
   
   One possible explanation we noticed from looking at 
`ClusteringExecutionStrategy`:
   
   The clustering execution strategy builds its own `TypedProperties` to pass 
to `HoodieFileGroupReader` (via something like `getReaderProperties()`). If 
this method only copies a limited set of properties (e.g., spill-map and 
memory-related keys) and does **not** propagate 
`hoodie.write.record.merge.custom.implementation.classes`, then 
`initRecordMerger` would receive an empty/missing value for that key.
   
   Meanwhile, compaction works because it goes through a different code path 
(`SparkMergeHelper` / `HoodieSparkMergeOnReadTableCompactor`) which has access 
to the full write config props.
   
   A secondary possible issue: even if `ConfigUtils.getMergeProps()` enriches 
the props with table config (`hoodie.properties`), `initRecordMerger` may be 
called with the **original** un-enriched `TypedProperties` rather than the 
enriched copy.
   
   We have not confirmed this by stepping through the code — if the Hudi team 
can verify whether `getReaderProperties()` in `ClusteringExecutionStrategy` 
explicitly propagates the custom merge implementation class key, that would 
either confirm or rule out this hypothesis.
   
   ### Environment
   
   **Hudi version:** 1.1.0 (`hudi-spark3.5-bundle_2.12-1.1.0.jar`)
   **Query engine:** (Spark/Flink/Trino etc): Spark 3.5 (AWS EMR-flavored, 
`spark-sql_2.12-3.5.6-amzn-1`)
   **Relevant configs:**
      ```
   hoodie.write.record.merge.mode=CUSTOM
   
hoodie.write.record.merge.custom.implementation.classes=com.example.CustomHoodieSparkRecordMerger
   hoodie.write.record.merge.strategy.id=<custom-uuid>
   hoodie.datasource.write.table.type=MERGE_ON_READ
   hoodie.clustering.inline=true
   hoodie.clustering.inline.max.commits=1
   
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
   
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy
   hoodie.write.concurrency.mode=optimistic_concurrency_control
   hoodie.clean.policy.failed.writes=LAZY
   ```
   
   ### Logs and Stack Trace
   
   Error thrown on each executor task (all tasks fail, Spark retries 4 times 
per task before aborting):
   
   ```
   WARN [task-result-getter] - Lost task 0.0 in stage 39.0:
   java.lang.IllegalArgumentException: No valid spark merger implementation set 
for
   `hoodie.write.record.merge.custom.implementation.classes`
       at 
org.apache.hudi.BaseSparkInternalRowReaderContext.getRecordMerger(BaseSparkInternalRowReaderContext.java:74)
       at 
org.apache.hudi.common.engine.HoodieReaderContext.initRecordMerger(HoodieReaderContext.java:331)
       at 
org.apache.hudi.common.engine.HoodieReaderContext.initRecordMerger(HoodieReaderContext.java:289)
       at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:111)
       at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:70)
       at 
org.apache.hudi.common.table.read.HoodieFileGroupReader$Builder.build(HoodieFileGroupReader.java:534)
       at 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy.getFileGroupReader(ClusteringExecutionStrategy.java:156)
       at 
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.access$100(MultipleSparkJobExecutionStrategy.java:95)
       at 
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy$1.call(MultipleSparkJobExecutionStrategy.java:323)
       at 
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy$1.call(MultipleSparkJobExecutionStrategy.java:314)
       at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
       ...
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
   ```
   
   Driver-level error after all tasks fail:
   
   ```
   ERROR [stream execution thread] (BaseHoodieWriteClient.java:641) - Inline 
compaction or
   clustering failed for table s3://...
   java.util.concurrent.CompletionException: 
java.util.concurrent.CancellationException
       at 
org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
       ...
   Caused by: java.util.concurrent.CancellationException
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to