hudi-agent commented on code in PR #18797:
URL: https://github.com/apache/hudi/pull/18797#discussion_r3284753578


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala:
##########
@@ -232,25 +250,52 @@ object HoodieVectorSearchPlanBuilder {
  * and select top-K per query. The cross-join produces O(|corpus| * |queries|)
  * intermediate rows, so this is suitable for small-to-medium query sets
  * (tens to low hundreds of queries) against moderate corpora.
+ *
+ * <p>Both modes support an optional {@code filter} predicate (applied to the 
corpus before
+ * distance computation, enabling Hudi partition pruning and data skipping) 
and an optional
+ * {@code maxDistance} threshold (results beyond this distance are excluded 
before top-K
+ * selection, reducing shuffle and sort volume).
  */
 object BruteForceSearchAlgorithm extends VectorSearchAlgorithm {
 
   import HoodieVectorSearchPlanBuilder._
 
   override val name: String = "brute_force"
 
+  /**
+   * Applies a user-supplied SQL predicate to the corpus DataFrame, wrapping
+   * [[ParseException]] (predicate syntax error) and [[AnalysisException]]
+   * (unknown column, type mismatch, etc.) in a [[HoodieAnalysisException]] 
that
+   * echoes the offending expression. Other exception types are allowed to
+   * propagate untouched so they aren't misreported as a filter problem.
+   */
+  private def applyFilter(df: DataFrame, filterExpr: String): DataFrame = {
+    try {
+      df.filter(filterExpr)
+    } catch {
+      case e @ (_: ParseException | _: AnalysisException) =>
+        throw new HoodieAnalysisException(
+          s"Invalid pre-filter expression '$filterExpr': ${e.getMessage}")
+    }
+  }
+
   override def buildSingleQueryPlan(
       spark: SparkSession,
       corpusDf: DataFrame,
       embeddingCol: String,
       queryVector: Array[Double],
       k: Int,
-      metric: DistanceMetric.Value): LogicalPlan = {
+      metric: DistanceMetric.Value,
+      filter: Option[String] = None,
+      maxDistance: Option[Double] = None): LogicalPlan = {
     validateEmbeddingColumn(corpusDf, embeddingCol)
     validateQueryVectorDimension(corpusDf, embeddingCol, queryVector.length)
 
     val elemType = getElementType(corpusDf, embeddingCol)
-    val filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
+    // Apply pre-filter before distance computation to enable Hudi partition 
pruning
+    // and data skipping, reducing the number of rows that need distance 
computation.
+    var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)

Review Comment:
   🤖 nit: the `var filteredDf = ...; filter.foreach(f => filteredDf = 
applyFilter(filteredDf, f))` pattern (repeated for `scored` and 
`filteredCorpus` in both plan builders) could be expressed without a mutable 
local, e.g. `val filteredDf = 
filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter)`. 
Reads a bit more idiomatically and avoids the reassignment.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to