nsivabalan commented on code in PR #13503:
URL: https://github.com/apache/hudi/pull/13503#discussion_r2178422697
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala:
##########
@@ -52,6 +55,10 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
private lazy val tableType = metaClient.getTableType
+ private lazy val useNewParquetFileFormat =
parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
+
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean
&&
+ !metaClient.isMetadataTable
Review Comment:
why we are disabling it for mdt?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala:
##########
@@ -123,10 +130,21 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
DataSourceReadOptions.START_COMMIT.key() -> startCompletionTime,
DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
)
- val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient,
cdcOptions, rangeType)
- .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
-
- sqlContext.sparkSession.internalCreateDataFrame(rdd,
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)
+ if (useNewParquetFileFormat) {
+ val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
Review Comment:
don't we need to account for older table version here?
I see below in DefaultSource
```
if (SparkConfigUtils.containsConfigProperty(parameters,
INCREMENTAL_READ_TABLE_VERSION)) {
val writeTableVersion =
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
if (writeTableVersion >=
HoodieTableVersion.EIGHT.versionCode()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]