schenksj commented on code in PR #4531:
URL: https://github.com/apache/datafusion-comet/pull/4531#discussion_r3329484458


##########
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##########
@@ -832,6 +832,67 @@ object QueryPlanSerde extends Logging with CometExprShim 
with CometTypeShim {
     }
   }
 
+  /**
+   * Serialize an associative boolean chain (`And` / `Or`) as a BALANCED 
`BinaryExpr` tree of
+   * depth `O(log n)` instead of the natural left-deep `O(n)`. A query with 
many ANDed/ORed
+   * predicates otherwise builds a proto nested deeper than protobuf's default 
recursion limit
+   * (100), which overflows when the serialized plan is re-parsed -- on the JVM
+   * (`OperatorOuterClass.Operator.parseFrom`, e.g. `findShuffleScanIndices` / 
explain) and in the
+   * Rust prost decoder. Comet evaluates `And`/`Or` vectorially (both sides 
always evaluated, no
+   * row-level short-circuit), so rebalancing the associative chain is 
semantically identical --
+   * it only changes the proto's shape.
+   *
+   * `operands` are the flattened leaves of the chain (see 
[[flattenAssociative]]); `wrap` tags
+   * each combined `BinaryExpr` as `And` or `Or`.
+   */
+  def createBalancedBinaryExpr(
+      expr: Expression,
+      operands: Seq[Expression],
+      inputs: Seq[Attribute],
+      binding: Boolean,
+      wrap: (
+          ExprOuterClass.Expr.Builder,
+          ExprOuterClass.BinaryExpr) => ExprOuterClass.Expr.Builder)
+      : Option[ExprOuterClass.Expr] = {
+    val protos = operands.map(exprToProtoInternal(_, inputs, binding))
+    if (protos.exists(_.isEmpty)) {
+      withFallbackReason(expr, operands: _*)
+      None
+    } else {
+      val leaves = protos.map(_.get).toIndexedSeq
+      def build(slice: IndexedSeq[ExprOuterClass.Expr]): ExprOuterClass.Expr = 
{
+        if (slice.length == 1) slice.head
+        else {
+          val mid = slice.length / 2
+          val inner = ExprOuterClass.BinaryExpr
+            .newBuilder()
+            .setLeft(build(slice.slice(0, mid)))
+            .setRight(build(slice.slice(mid, slice.length)))
+            .build()
+          wrap(ExprOuterClass.Expr.newBuilder(), inner).build()
+        }
+      }
+      Some(build(leaves))
+    }
+  }
+
+  /**
+   * Flatten an associative binary chain into its leaf operands. `matches` 
identifies the same
+   * operator (e.g. `case _: And => true`) and `children` extracts its two 
operands. Used to
+   * rebalance deep `And`/`Or` chains before serialization (see 
[[createBalancedBinaryExpr]]).
+   */
+  def flattenAssociative(

Review Comment:
   Good call. Rewrote it with an explicit work stack and an accumulating buffer 
instead of recursion, so it's O(n) time and no longer recurses O(n) deep (which 
on these left-deep chains could itself overflow the stack). Left-to-right 
operand order is preserved.



##########
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala:
##########
@@ -3096,4 +3096,25 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  test("deep AND/OR predicate chains do not overflow the protobuf recursion 
limit") {

Review Comment:
   Added both: the chains now mix in a nullable column so the rebalanced tree 
is exercised under three-valued logic, and there's a deep `OR` in a `WHERE` 
clause, which, unlike a top-level `AND`, Spark doesn't split, so it stays 
deeply nested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to