yihua commented on code in PR #13503:
URL: https://github.com/apache/hudi/pull/13503#discussion_r2233077747
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala:
##########
@@ -135,20 +150,31 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
DataSourceReadOptions.END_COMMIT.key -> endOffset.offsetCommitTime
)
- val rdd = tableType match {
- case HoodieTableType.COPY_ON_WRITE =>
- val serDe = sparkAdapter.createSparkRowSerDe(schema)
- new IncrementalRelationV2(sqlContext, incParams, Some(schema),
metaClient, rangeType)
- .buildScan()
- .map(serDe.serializeRow)
- case HoodieTableType.MERGE_ON_READ =>
- val requiredColumns = schema.fields.map(_.name)
- new MergeOnReadIncrementalRelationV2(sqlContext, incParams,
metaClient, Some(schema), rangeType = rangeType)
- .buildScan(requiredColumns, Array.empty[Filter])
- .asInstanceOf[RDD[InternalRow]]
- case _ => throw new IllegalArgumentException(s"UnSupport tableType:
$tableType")
+ if (useNewParquetFileFormat) {
+ val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
+ new
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(sqlContext, metaClient,
incParams, Option(schema), false, rangeType)
+ .build()
+ } else {
+ new
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(sqlContext, metaClient,
incParams, Option(schema), false, rangeType)
+ .build()
Review Comment:
Similar here on checking the schema and `isBootstrappedTable` used, to be
aligned with `DefaultSource`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala:
##########
@@ -52,6 +53,9 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
private lazy val tableType = metaClient.getTableType
+ private lazy val useNewParquetFileFormat =
parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
Review Comment:
Same here on variable naming
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala:
##########
@@ -123,10 +127,21 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
DataSourceReadOptions.START_COMMIT.key() -> startCompletionTime,
DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
)
- val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient,
cdcOptions, rangeType)
- .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
-
- sqlContext.sparkSession.internalCreateDataFrame(rdd,
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)
+ if (useNewParquetFileFormat) {
+ val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
+ new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters ++ cdcOptions, None, false,
rangeType).build()
+ } else {
+ new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters ++ cdcOptions, None, false,
rangeType).build()
Review Comment:
Similar here on revisiting schema used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]