andygrove commented on code in PR #4531:
URL: https://github.com/apache/datafusion-comet/pull/4531#discussion_r3329324485
##########
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##########
@@ -832,6 +832,67 @@ object QueryPlanSerde extends Logging with CometExprShim
with CometTypeShim {
}
}
+ /**
+ * Serialize an associative boolean chain (`And` / `Or`) as a BALANCED
`BinaryExpr` tree of
+ * depth `O(log n)` instead of the natural left-deep `O(n)`. A query with
many ANDed/ORed
+ * predicates otherwise builds a proto nested deeper than protobuf's default
recursion limit
+ * (100), which overflows when the serialized plan is re-parsed -- on the JVM
+ * (`OperatorOuterClass.Operator.parseFrom`, e.g. `findShuffleScanIndices` /
explain) and in the
+ * Rust prost decoder. Comet evaluates `And`/`Or` vectorially (both sides
always evaluated, no
+ * row-level short-circuit), so rebalancing the associative chain is
semantically identical --
+ * it only changes the proto's shape.
+ *
+ * `operands` are the flattened leaves of the chain (see
[[flattenAssociative]]); `wrap` tags
+ * each combined `BinaryExpr` as `And` or `Or`.
+ */
+ def createBalancedBinaryExpr(
+ expr: Expression,
+ operands: Seq[Expression],
+ inputs: Seq[Attribute],
+ binding: Boolean,
+ wrap: (
+ ExprOuterClass.Expr.Builder,
+ ExprOuterClass.BinaryExpr) => ExprOuterClass.Expr.Builder)
+ : Option[ExprOuterClass.Expr] = {
+ val protos = operands.map(exprToProtoInternal(_, inputs, binding))
+ if (protos.exists(_.isEmpty)) {
+ withFallbackReason(expr, operands: _*)
+ None
+ } else {
+ val leaves = protos.map(_.get).toIndexedSeq
+ def build(slice: IndexedSeq[ExprOuterClass.Expr]): ExprOuterClass.Expr =
{
+ if (slice.length == 1) slice.head
+ else {
+ val mid = slice.length / 2
+ val inner = ExprOuterClass.BinaryExpr
+ .newBuilder()
+ .setLeft(build(slice.slice(0, mid)))
+ .setRight(build(slice.slice(mid, slice.length)))
+ .build()
+ wrap(ExprOuterClass.Expr.newBuilder(), inner).build()
+ }
+ }
+ Some(build(leaves))
+ }
+ }
+
+ /**
+ * Flatten an associative binary chain into its leaf operands. `matches`
identifies the same
+ * operator (e.g. `case _: And => true`) and `children` extracts its two
operands. Used to
+ * rebalance deep `And`/`Or` chains before serialization (see
[[createBalancedBinaryExpr]]).
+ */
+ def flattenAssociative(
Review Comment:
Small thought: this recurses O(n) deep and the ++ accumulation is O(n^2). It
is totally fine for the depths that hit this bug, but since the motivation is
deep chains, would an explicit accumulator be worth it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]