lokeshj1703 commented on code in PR #11710:
URL: https://github.com/apache/hudi/pull/11710#discussion_r1742876655
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -181,6 +183,67 @@ object HoodieAnalysis extends SparkAdapterSupport {
*/
case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends
Rule[LogicalPlan] {
+ /**
+ * The function updates the HadoopFSRelation so that it uses
HoodieFileIndex instead of
+ * HoodieFileIndexTimestampKeyGen. Also the data type for output
attributes of the plan are changed
+ * accordingly. HoodieFileIndexTimestampKeyGen is used by
HoodieBaseRelation for reading tables with
+ * Timestamp or custom key generator.
+ */
+ private def transformReaderFSRelation(logicalPlan: Option[LogicalPlan]):
Option[LogicalPlan] = {
+ def getAttributesFromTableSchema(catalogTableOpt: Option[CatalogTable],
lr: LogicalRelation, attributesSet: Set[AttributeReference]) = {
+ var finalAttrs: List[AttributeReference] = List.empty
+ if (catalogTableOpt.isDefined) {
+ for (attr <- lr.output) {
+ val origAttr: AttributeReference = attributesSet.collectFirst({
case a if a.name.equals(attr.name) => a }).get
+ val catalogAttr =
catalogTableOpt.get.partitionSchema.fields.collectFirst({ case a if
a.name.equals(attr.name) => a })
+ val newAttr: AttributeReference = if (catalogAttr.isDefined) {
+ origAttr.copy(dataType =
catalogAttr.get.dataType)(origAttr.exprId, origAttr.qualifier)
+ } else {
+ origAttr
+ }
+ finalAttrs = finalAttrs :+ newAttr
+ }
+ }
+ finalAttrs
+ }
+
+ def getHadoopFsRelation(plan: LogicalPlan): Option[HadoopFsRelation] = {
+ EliminateSubqueryAliases(plan) match {
+ // First, we need to weed out unresolved plans
+ case plan if !plan.resolved => None
+ // NOTE: When resolving Hudi table we allow [[Filter]]s and
[[Project]]s be applied
+ // on top of it
+ case PhysicalOperation(_, _, LogicalRelation(relation, _, _, _)) if
relation.isInstanceOf[HadoopFsRelation] =>
Some(relation.asInstanceOf[HadoopFsRelation])
+ case _ => None
+ }
+ }
+
+ logicalPlan.map(relation => {
+ val catalogTableOpt = sparkAdapter.resolveHoodieTable(relation)
+ val fsRelation = getHadoopFsRelation(relation)
+ if (fsRelation.isDefined &&
fsRelation.get.location.isInstanceOf[HoodieReaderFileIndex]) {
+ relation transformUp {
+ case lr: LogicalRelation =>
+ val finalAttrs: List[AttributeReference] =
getAttributesFromTableSchema(catalogTableOpt, lr, lr.output.toSet)
+ val newFsRelation: BaseRelation = lr.relation match {
+ case fsRelation: HadoopFsRelation =>
+ if (catalogTableOpt.isDefined) {
+ // replace HoodieFileIndexTimestampKeyGen with
HoodieFileIndex instance
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]