xushiyan commented on code in PR #6727:
URL: https://github.com/apache/hudi/pull/6727#discussion_r979490209
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala:
##########
@@ -55,15 +56,24 @@ class HoodieStreamSource(
extends Source with Logging with Serializable with SparkAdapterSupport {
@transient private val hadoopConf =
sqlContext.sparkSession.sessionState.newHadoopConf()
+
private lazy val tablePath: Path = {
val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
val fs = path.getFileSystem(hadoopConf)
TablePathUtils.getTablePath(fs, path).get()
}
- private lazy val metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tablePath.toString).build()
+
+ private lazy val metaClient = HoodieTableMetaClient.builder()
+ .setConf(hadoopConf).setBasePath(tablePath.toString).build()
+
private lazy val tableType = metaClient.getTableType
+ private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
+
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
&&
+
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
Review Comment:
repeated check-logic. could have been extracted to a util
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -219,7 +217,9 @@ object DefaultSource {
}
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants()
== 0) {
- new EmptyRelation(sqlContext, metaClient)
+ new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
+ } else if (isCdcQuery) {
+ CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
} else {
(tableType, queryType, isBootstrappedTable) match {
Review Comment:
this if-check could merge with the match below, to achieve code alignment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]