jinchengchenghh commented on code in PR #11197:
URL: 
https://github.com/apache/incubator-gluten/pull/11197#discussion_r2568122807


##########
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##########
@@ -28,6 +29,76 @@ case class ColumnarCollectLimitExec(
     offset: Int = 0
 ) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
 
+  /**
+   * Override doExecuteColumnar to handle zero-column schema specially. Velox 
doesn't support

Review Comment:
   This operator directly call shuffle, other operator may produce empty schema 
as well, then shuffle. This is AI answer
   <html><head></head><body><p>Spark can generate <strong>shuffle with an empty 
schema</strong> in <em>only a few very specific logical/physical cases</em>.<br>
   These are <strong>legal in Catalyst</strong>, but many external engines 
(Gluten/Velox) do not expect them—leading to failures like:</p>
   <pre><code>MatchError: KeyGroupedPartitioning(...)
   </code></pre>
   <p>Below is the complete list of <strong>when Spark produces a shuffle whose 
output schema has 0 columns</strong>.</p>
   <hr>
   <h1>✅ <strong>1. <code inline="">Aggregate</code> with no grouping keys and 
no aggregate expressions</strong></h1>
   <p>Catalyst allows:</p>
   <pre><code>Aggregate(
     groupingExpressions = Nil,
     aggregateExpressions = Nil,
     child = ...
   )
   </code></pre>
   <p>This produces <strong>one row, zero columns</strong>.</p>
   <p>If Spark decides to shuffle before this aggregate (e.g., to satisfy 
distribution requirements), you get:</p>
   <pre><code>Exchange(RandomPartitioning, childSchema = [])
   </code></pre>
   <p><strong>Common cause:</strong><br>
   Queries rewritten by optimizer such as:</p>
   <ul>
   <li>
   <p><code inline="">SELECT EXISTS(...)</code></p>
   </li>
   <li>
   <p><code inline="">SELECT (subquery)</code> where only a boolean is 
needed</p>
   </li>
   <li>
   <p><code inline="">SELECT count(*) &gt; 0</code> simplified to an empty 
aggregate plan</p>
   </li>
   </ul>
   <hr>
   <h1>✅ <strong>2. Subquery rewrite → empty projection → shuffle</strong></h1>
   <p>Queries that check existence may be rewritten like:</p>
   <pre><code>Project(Nil, Filter(...))
   </code></pre>
   <p>If this subquery feeds a join or outer join, distribution requirements 
force Spark to insert a shuffle:</p>
   <pre><code>Exchange(..., schema = [])
   </code></pre>
   <hr>
   <h1>✅ <strong>3. Distinct or grouping cases that optimize to empty 
output</strong></h1>
   <p>Example:</p>
   <pre><code class="language-sql">SELECT COUNT(*) FROM t DISTRIBUTE BY 1;
   </code></pre>
   <p>Catalyst may collapse the expressions and produce a shuffle that does not 
require any output attributes.</p>
   <p>This is rare but happens in:</p>
   <ul>
   <li>
   <p><code inline="">GROUP BY ()</code> with no aggregates</p>
   </li>
   <li>
   <p><code inline="">DISTINCT</code> on constants</p>
   </li>
   </ul>
   <hr>
   <h1>✅ <strong>4. SQL commands using empty-schema leaf nodes 
(OneRowRelation)</strong></h1>
   <p>Some Spark plans internally use:</p>
   <pre><code>OneRowRelation(output = Nil)
   </code></pre>
   <p>If distribution requirements apply (e.g., CTE reuse, subquery reuse, 
adaptive execution), it can pass through a shuffle.</p>
   <p>Even though the result is a single row, Spark’s planner may still wrap it 
in a shuffle stage.</p>
   <hr>
   <h1>⚠️ <strong>Why Gluten/Velox hits MatchError</strong></h1>
   <p>Gluten's partitioning code assumes something like:</p>
   <pre><code>Partitioning has at least 1 column
   </code></pre>
   <p>But Spark allows:</p>
   <pre><code>KeyGroupedPartitioning([], numPartitions)
   </code></pre>
   <p>Velox currently does not support this case properly.</p>
   <hr>
   <h1>🎯 How to reproduce a shuffle with <strong>empty schema</strong></h1>
   <p>Here are minimal reproducible examples.</p>
   <h3><strong>Example 1: empty aggregate</strong></h3>
   <pre><code class="language-sql">SELECT * FROM (SELECT * FROM t WHERE false) 
tmp;
   </code></pre>
   <p>Physical plan includes:</p>
   <pre><code>Aggregate []
   Exchange (schema = [])
   </code></pre>
   <h3><strong>Example 2: EXISTS subquery</strong></h3>
   <pre><code class="language-sql">SELECT EXISTS (SELECT 1 FROM t);
   </code></pre>
   <p>Catalyst may generate a shuffle inside the subquery with empty output 
attributes.</p>
   <h3><strong>Example 3: COUNT(*) without selecting output</strong></h3>
   <pre><code 
class="language-scala">spark.range(10).groupBy().agg().explain(true)
   </code></pre>
   <p>Plan:</p>
   <pre><code>HashAggregate(keys=[], functions=[])
     Exchange
       ...
     
   Output schema: []
   </code></pre>
   <hr>
   <h1>✔️ Summary: When Spark generates shuffle with empty schema</h1>
   
   Case | Why empty schema | Generates shuffle?
   -- | -- | --
   Aggregate with no grouping & no agg expr | Output has 0 columns | Yes
   EXISTS / scalar subquery rewrite | Project(Nil) | Yes
   Empty projection from optimization | No output attrs | Yes
   OneRowRelation propagation | Internal command plans | Yes
   
   
   <hr>
   <p>If you want, I can analyze your Spark plan from the failing test and show 
<strong>exactly which operator</strong> produced the empty schema that caused 
the KeyGroupedPartitioning crash.</p></body></html>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to