hudi-agent commented on code in PR #18797:
URL: https://github.com/apache/hudi/pull/18797#discussion_r3277691886
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala:
##########
@@ -232,25 +249,50 @@ object HoodieVectorSearchPlanBuilder {
* and select top-K per query. The cross-join produces O(|corpus| * |queries|)
* intermediate rows, so this is suitable for small-to-medium query sets
* (tens to low hundreds of queries) against moderate corpora.
+ *
+ * <p>Both modes support an optional {@code filter} predicate (applied to the
corpus before
+ * distance computation, enabling Hudi partition pruning and data skipping)
and an optional
+ * {@code maxDistance} threshold (results beyond this distance are excluded
before top-K
+ * selection, reducing shuffle and sort volume).
*/
object BruteForceSearchAlgorithm extends VectorSearchAlgorithm {
import HoodieVectorSearchPlanBuilder._
override val name: String = "brute_force"
+ /**
+ * Applies a user-supplied SQL predicate to the corpus DataFrame, wrapping
any
+ * analyzer / parser failure in a [[HoodieAnalysisException]] with the
offending
+ * expression for easier debugging.
+ */
+ private def applyFilter(df: DataFrame, filterExpr: String): DataFrame = {
+ try {
+ df.filter(filterExpr)
+ } catch {
+ case e: Exception =>
+ throw new HoodieAnalysisException(
+ s"Invalid pre-filter expression '$filterExpr': ${e.getMessage}")
+ }
+ }
+
override def buildSingleQueryPlan(
spark: SparkSession,
corpusDf: DataFrame,
embeddingCol: String,
queryVector: Array[Double],
k: Int,
- metric: DistanceMetric.Value): LogicalPlan = {
+ metric: DistanceMetric.Value,
+ filter: Option[String] = None,
+ maxDistance: Option[Double] = None): LogicalPlan = {
validateEmbeddingColumn(corpusDf, embeddingCol)
validateQueryVectorDimension(corpusDf, embeddingCol, queryVector.length)
val elemType = getElementType(corpusDf, embeddingCol)
- val filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
+ // Apply pre-filter before distance computation to enable Hudi partition
pruning
+ // and data skipping, reducing the number of rows that need distance
computation.
+ var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
+ filter.foreach(f => filteredDf = applyFilter(filteredDf, f))
Review Comment:
🤖 nit: using `var` + `foreach` to optionally mutate a local is a bit
non-idiomatic Scala — could you use `fold` instead? e.g. `val base =
corpusDf.filter(col(embeddingCol).isNotNull); val filteredDf =
filter.fold(base)(applyFilter(base, _))` keeps everything a `val`. The same
pattern appears for `filteredCorpus` (batch path, line ~347) and both `scored`
locals.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]