YannByron commented on a change in pull request #4877:
URL: https://github.com/apache/hudi/pull/4877#discussion_r826534174



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = 
filterExpressions.partition(isPartitionPredicate)
+
+    val fileSplits = collectFileSplits(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString)
+
     // Here we rely on a type erasure, to workaround inherited API restriction 
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
     // Please check [[needConversion]] scala-doc for more details
-    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+    composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, 
filters).asInstanceOf[RDD[Row]]
   }
 
-  protected def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow]
+  // TODO scala-doc
+  protected def composeRDD(fileSplits: Seq[FileSplit],
+                           partitionSchema: StructType,
+                           tableSchema: HoodieTableSchema,
+                           requiredSchema: HoodieTableSchema,
+                           filters: Array[Filter]): HoodieUnsafeRDD
+
+  // TODO scala-doc
+  protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSplit]
+
+  protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: 
Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
+    if (globPaths.isEmpty) {
+      val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters)
+      partitionDirs.map(pd => (getPartitionPath(pd.files.head), 
pd.files)).toMap
+    } else {
+      val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths)
+      val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, 
dataFilters)
+
+      val fsView = new HoodieTableFileSystemView(metaClient, timeline, 
partitionDirs.flatMap(_.files).toArray)
+      val latestBaseFiles = 
fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
+
+      latestBaseFiles.groupBy(getPartitionPath)
+    }
+  }
+
+  protected def convertToExpressions(filters: Array[Filter]): 
Array[Expression] = {
+    val catalystExpressions = 
HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
+
+    val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) 
=> opt.isEmpty }
+    if (failedExprs.nonEmpty) {
+      val failedFilters = failedExprs.map(p => filters(p._2))
+      logWarning(s"Failed to convert Filters into Catalyst expressions 
(${failedFilters.map(_.toString)})")
+    }
+
+    catalystExpressions.filter(_.isDefined).map(_.get).toArray
+  }
+
+  /**
+   * Checks whether given expression only references partition columns
+   * (and involves no sub-query)
+   */
+  protected def isPartitionPredicate(condition: Expression): Boolean = {
+    // Validates that the provided names both resolve to the same entity
+    val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
+
+    condition.references.forall { r => 
partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
+      !SubqueryExpression.hasSubquery(condition)
+  }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
     val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
     requestedColumns ++ missing
   }
+
+  private def getPrecombineFieldProperty: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
+      // NOTE: This is required to compensate for cases when empty string is 
used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  private def imbueConfigs(sqlContext: SQLContext): Unit = {
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "true")
+  }

Review comment:
       yep. `enableVectorizedReader` was false. but as discussed with 
@alexeykudinkin before, we need to enable this to speed up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to