jinchengchenghh commented on code in PR #11197:
URL:
https://github.com/apache/incubator-gluten/pull/11197#discussion_r2568122807
##########
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##########
@@ -28,6 +29,76 @@ case class ColumnarCollectLimitExec(
offset: Int = 0
) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+ /**
+ * Override doExecuteColumnar to handle zero-column schema specially. Velox
doesn't support
Review Comment:
This operator directly call shuffle, other operator may produce empty schema
as well, then shuffle. This is AI answer
<html><head></head><body><p>Spark can generate <strong>shuffle with an empty
schema</strong> in <em>only a few very specific logical/physical cases</em>.<br>
These are <strong>legal in Catalyst</strong>, but many external engines
(Gluten/Velox) do not expect them—leading to failures like:</p>
<pre><code>MatchError: KeyGroupedPartitioning(...)
</code></pre>
<p>Below is the complete list of <strong>when Spark produces a shuffle whose
output schema has 0 columns</strong>.</p>
<hr>
<h1>✅ <strong>1. <code inline="">Aggregate</code> with no grouping keys and
no aggregate expressions</strong></h1>
<p>Catalyst allows:</p>
<pre><code>Aggregate(
groupingExpressions = Nil,
aggregateExpressions = Nil,
child = ...
)
</code></pre>
<p>This produces <strong>one row, zero columns</strong>.</p>
<p>If Spark decides to shuffle before this aggregate (e.g., to satisfy
distribution requirements), you get:</p>
<pre><code>Exchange(RandomPartitioning, childSchema = [])
</code></pre>
<p><strong>Common cause:</strong><br>
Queries rewritten by optimizer such as:</p>
<ul>
<li>
<p><code inline="">SELECT EXISTS(...)</code></p>
</li>
<li>
<p><code inline="">SELECT (subquery)</code> where only a boolean is
needed</p>
</li>
<li>
<p><code inline="">SELECT count(*) > 0</code> simplified to an empty
aggregate plan</p>
</li>
</ul>
<hr>
<h1>✅ <strong>2. Subquery rewrite → empty projection → shuffle</strong></h1>
<p>Queries that check existence may be rewritten like:</p>
<pre><code>Project(Nil, Filter(...))
</code></pre>
<p>If this subquery feeds a join or outer join, distribution requirements
force Spark to insert a shuffle:</p>
<pre><code>Exchange(..., schema = [])
</code></pre>
<hr>
<h1>✅ <strong>3. Distinct or grouping cases that optimize to empty
output</strong></h1>
<p>Example:</p>
<pre><code class="language-sql">SELECT COUNT(*) FROM t DISTRIBUTE BY 1;
</code></pre>
<p>Catalyst may collapse the expressions and produce a shuffle that does not
require any output attributes.</p>
<p>This is rare but happens in:</p>
<ul>
<li>
<p><code inline="">GROUP BY ()</code> with no aggregates</p>
</li>
<li>
<p><code inline="">DISTINCT</code> on constants</p>
</li>
</ul>
<hr>
<h1>✅ <strong>4. SQL commands using empty-schema leaf nodes
(OneRowRelation)</strong></h1>
<p>Some Spark plans internally use:</p>
<pre><code>OneRowRelation(output = Nil)
</code></pre>
<p>If distribution requirements apply (e.g., CTE reuse, subquery reuse,
adaptive execution), it can pass through a shuffle.</p>
<p>Even though the result is a single row, Spark’s planner may still wrap it
in a shuffle stage.</p>
<hr>
<h1>⚠️ <strong>Why Gluten/Velox hits MatchError</strong></h1>
<p>Gluten's partitioning code assumes something like:</p>
<pre><code>Partitioning has at least 1 column
</code></pre>
<p>But Spark allows:</p>
<pre><code>KeyGroupedPartitioning([], numPartitions)
</code></pre>
<p>Velox currently does not support this case properly.</p>
<hr>
<h1>🎯 How to reproduce a shuffle with <strong>empty schema</strong></h1>
<p>Here are minimal reproducible examples.</p>
<h3><strong>Example 1: empty aggregate</strong></h3>
<pre><code class="language-sql">SELECT * FROM (SELECT * FROM t WHERE false)
tmp;
</code></pre>
<p>Physical plan includes:</p>
<pre><code>Aggregate []
Exchange (schema = [])
</code></pre>
<h3><strong>Example 2: EXISTS subquery</strong></h3>
<pre><code class="language-sql">SELECT EXISTS (SELECT 1 FROM t);
</code></pre>
<p>Catalyst may generate a shuffle inside the subquery with empty output
attributes.</p>
<h3><strong>Example 3: COUNT(*) without selecting output</strong></h3>
<pre><code
class="language-scala">spark.range(10).groupBy().agg().explain(true)
</code></pre>
<p>Plan:</p>
<pre><code>HashAggregate(keys=[], functions=[])
Exchange
...
Output schema: []
</code></pre>
<hr>
<h1>✔️ Summary: When Spark generates shuffle with empty schema</h1>
Case | Why empty schema | Generates shuffle?
-- | -- | --
Aggregate with no grouping & no agg expr | Output has 0 columns | Yes
EXISTS / scalar subquery rewrite | Project(Nil) | Yes
Empty projection from optimization | No output attrs | Yes
OneRowRelation propagation | Internal command plans | Yes
<hr>
<p>If you want, I can analyze your Spark plan from the failing test and show
<strong>exactly which operator</strong> produced the empty schema that caused
the KeyGroupedPartitioning crash.</p></body></html>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]