yihua commented on code in PR #18276:
URL: https://github.com/apache/hudi/pull/18276#discussion_r2912856978


##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+        v
+V1Write -> InsertableRelation.insert(data, overwrite)
+        v
+Align columns (rename + cast to table's user schema)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v

Review Comment:
   🤖 The class table mentions `HoodieScanBuilder` supports 
`SupportsPushDownFilters` and `SupportsPushDownRequiredColumns`, but the 
implementation phases show filter pushdown landing in phase 3 and column 
pruning in phase 2. It might be worth clarifying whether `HoodieScanBuilder` 
will initially accept but ignore pushdowns, or whether the interfaces are only 
added in later phases. This affects how Spark plans queries in the early phases.



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+        v
+V1Write -> InsertableRelation.insert(data, overwrite)
+        v
+Align columns (rename + cast to table's user schema)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+        v

Review Comment:
   🤖 The RFC reuses `HoodieV1WriteBuilder` from `HoodieInternalV2Table`, but 
that class is currently a private inner class. Could you clarify how 
`HoodieSparkV2Table` will access it — will it be extracted into a standalone 
class, or will `HoodieSparkV2Table` extend `HoodieInternalV2Table`? It would 
help to briefly describe the relationship between these two table classes and 
whether `HoodieInternalV2Table` is eventually retired.



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+        v
+V1Write -> InsertableRelation.insert(data, overwrite)
+        v
+Align columns (rename + cast to table's user schema)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+        v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+        v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+        v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = conf("hoodie.datasource.read.use.v2") = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsRead.newScanBuilder() -> HoodieScanBuilder

Review Comment:
   🤖 The test plan mentions TPC-H benchmarks but doesn't set success criteria. 
It might be worth defining what performance delta is acceptable — e.g., "DSv2 
COW snapshot reads should be within 5% of DSv1 on TPC-H SF10." Without a 
concrete bar, it's hard to evaluate whether the implementation is ready to 
graduate from opt-in to default.



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+        v
+V1Write -> InsertableRelation.insert(data, overwrite)
+        v
+Align columns (rename + cast to table's user schema)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+        v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+        v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+        v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v

Review Comment:
   🤖 For phase 4 (MOR snapshot read), the plan is to reuse 
`HoodieFileGroupReader` for base + log merge. Have you considered the memory 
and performance implications of reading through the DSv2 
`PartitionReader<InternalRow>` interface? The current V1 path uses vectorized 
Parquet reading in some cases — will the DSv2 path support columnar reads 
(`PartitionReaderFactory.supportColumnarReads`) or only row-based reads 
initially?



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+        v
+V1Write -> InsertableRelation.insert(data, overwrite)
+        v
+Align columns (rename + cast to table's user schema)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+        v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+        v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+        v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = conf("hoodie.datasource.read.use.v2") = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsRead.newScanBuilder() -> HoodieScanBuilder
+        v
+HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
 
 ### Read
 

Review Comment:
   🤖 The rollout plan says "no regression risk" since the existing path is 
untouched, but for SQL queries the routing goes through the same 
`HoodieCatalog.loadTable()`. Currently that method returns V1 when schema 
evolution is disabled, and V2 when enabled. How does the new 
`hoodie.datasource.read.use.v2` config interact with 
`hoodie.schema.on.read.enable`? What happens if both are true, or if schema 
evolution is on but v2 read is off?



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+        v
+V1Write -> InsertableRelation.insert(data, overwrite)
+        v
+Align columns (rename + cast to table's user schema)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+        v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+        v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+        v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = conf("hoodie.datasource.read.use.v2") = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsRead.newScanBuilder() -> HoodieScanBuilder
+        v
+HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
 
 ### Read
 
-<!-- main part -->
+All new classes go into package `org.apache.spark.sql.hudi.v2` inside 
`hudi-spark-common`.
+
+| Class | Spark Interface | Responsibility |
+|-------|-----------------|----------------|
+| `HoodieDataSourceV2` | `TableProvider`, `DataSourceRegister`, 
`CreatableRelationProvider` | SPI entry point for `format("hudi_v2")`. 
`CreatableRelationProvider` enables DataFrame API writes via 
`df.write.format("hudi_v2")`. |
+| `HoodieSparkV2Table` | `Table`, `SupportsRead`, `SupportsWrite`, 
`V2TableWithV1Fallback` | Routes reads to DSv2, writes to DSv1 fallback via 
`HoodieV1WriteBuilder`. |
+| `HoodieScanBuilder` | `ScanBuilder`, `SupportsPushDownFilters`, 
`SupportsPushDownRequiredColumns` | Collects filter and column pruning 
pushdowns. |
+| `HoodieBatchScan` | `Scan`, `Batch` | Plans input partitions using existing 
`HoodieFileIndex`. |
+| `HoodieInputPartition` | `InputPartition` | Serializable descriptor for file 
slices. |
+| `HoodiePartitionReaderFactory` | `PartitionReaderFactory` | Creates readers 
on executors. |
+| `HoodiePartitionReader` | `PartitionReader[InternalRow]` | Delegates to 
existing file-reading code from `HoodieBaseRelation`. |
+| `HoodieV1WriteBuilder` (reused) | `SupportsTruncate`, `SupportsOverwrite`, 
`ProvidesHoodieConfig` | Existing V1 write fallback builder from 
`HoodieInternalV2Table`, reused by `HoodieSparkV2Table`. |
 
 ### Table services
 
-<!-- with read substages -->
+Table services (compaction, clustering, cleaning) are not affected by this 
change.
+They operate via the write client and are triggered independently of the read 
path.
+
+### Implementation phases
+
+1. **Coexistence POC.** All new classes return empty read results, SPI 
registration, reuse of `HoodieV1WriteBuilder` for V1 write fallback, 
`hoodie.datasource.read.use.v2` config, 

Review Comment:
   🤖 The future work roadmap (steps 3-6) describes swapping format names: 
`hudi_v2` becomes `hudi` and `hudi` becomes `hudi_v1`. Note that 
`DefaultSource.shortName()` already returns `"hudi_v1"` while 
`BaseDefaultSource` returns `"hudi"`. Have you checked that this existing 
naming won't conflict with the planned name swap? It would be helpful to 
document the full SPI name mapping at each transition step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to