hudi-agent commented on code in PR #18797:
URL: https://github.com/apache/hudi/pull/18797#discussion_r3305488492
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala:
##########
@@ -269,11 +312,12 @@ object BruteForceSearchAlgorithm extends
VectorSearchAlgorithm {
// so only the corpus column is passed per row.
val distanceUdf = VectorDistanceUtils.createSingleQueryDistanceUdf(metric,
elemType, queryVector)
- val result = filteredDf
+ var scored = filteredDf
Review Comment:
🤖 nit: same `var scored = ...; maxDistance.foreach(...)` pattern here —
folding the Option in (`val scored = maxDistance.foldLeft(scoredBase)((df, d)
=> df.filter(col(DISTANCE_COL) <= d))`) avoids the rebinding and matches the
style used elsewhere in the file.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala:
##########
@@ -232,25 +250,50 @@ object HoodieVectorSearchPlanBuilder {
* and select top-K per query. The cross-join produces O(|corpus| * |queries|)
* intermediate rows, so this is suitable for small-to-medium query sets
* (tens to low hundreds of queries) against moderate corpora.
+ *
+ * <p>Both modes support an optional {@code filter} predicate (applied to the
corpus before
+ * distance computation), and an optional {@code maxDistance} threshold
(results beyond this
+ * distance are excluded before top-K selection, reducing shuffle and sort
volume).
*/
object BruteForceSearchAlgorithm extends VectorSearchAlgorithm {
import HoodieVectorSearchPlanBuilder._
override val name: String = "brute_force"
+ /**
+ * Applies a user-supplied SQL predicate to the corpus DataFrame, wrapping
+ * [[ParseException]] (predicate syntax error) and [[AnalysisException]]
+ * (unknown column, type mismatch, etc.) in a [[HoodieAnalysisException]]
that
+ * echoes the offending expression. Other exception types are allowed to
+ * propagate untouched so they aren't misreported as a filter problem.
+ */
+ private def applyFilter(df: DataFrame, filterExpr: String): DataFrame = {
+ try {
+ df.filter(filterExpr)
+ } catch {
+ case e @ (_: ParseException | _: AnalysisException) =>
+ throw new HoodieAnalysisException(
+ s"Invalid pre-filter expression '$filterExpr': ${e.getMessage}")
+ }
+ }
+
override def buildSingleQueryPlan(
spark: SparkSession,
corpusDf: DataFrame,
embeddingCol: String,
queryVector: Array[Double],
k: Int,
- metric: DistanceMetric.Value): LogicalPlan = {
+ metric: DistanceMetric.Value,
+ filter: Option[String],
+ maxDistance: Option[Double]): LogicalPlan = {
validateEmbeddingColumn(corpusDf, embeddingCol)
validateQueryVectorDimension(corpusDf, embeddingCol, queryVector.length)
val elemType = getElementType(corpusDf, embeddingCol)
- val filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
+ // Apply pre-filter before distance computation to enable reducing the
number of rows that need distance computation.
+ var filteredDf = corpusDf.filter(col(embeddingCol).isNotNull)
+ filter.foreach(f => filteredDf = applyFilter(filteredDf, f))
// Validate byte corpus query vector values before creating the UDF.
Review Comment:
🤖 nit: the `var filteredDf` + `filter.foreach(...)` reassignment reads
awkwardly in Scala. Could you switch to something like `val filteredDf =
filter.foldLeft(corpusDf.filter(col(embeddingCol).isNotNull))(applyFilter)` (or
`filter.map(applyFilter(base, _)).getOrElse(base)`) so the binding stays
immutable?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]