This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7680c6dc8ab [fix](fe) Fix remote Flight SQL result receiver 
initialization (#63136)
7680c6dc8ab is described below

commit 7680c6dc8ab729416aac9c8b7d49d171c08fb434
Author: Pxl <[email protected]>
AuthorDate: Tue May 12 10:27:28 2026 +0800

    [fix](fe) Fix remote Flight SQL result receiver initialization (#63136)
    
    Problem Summary: Arrow Flight SQL queries that return results from BE do
    not create local result receivers. The old coordinator still initialized
    ResultReceiverConsumer with an empty receiver list, which failed with
    ArrayBlockingQueue(0), and the new coordinator still created unnecessary
    local receivers for remote result mode.
    
    ### Release note
    
    Fix Arrow Flight SQL query execution when results are returned from BE.
    
    ### Check List (For Author)
    
    - Test: Unit Test
        - Ran org.apache.doris.qe.ResultReceiverConsumerTest
        - Ran FE checkstyle for fe-core
    - Behavior changed: No
    - Does this need documentation: No
    
    Co-authored-by: Copilot <[email protected]>
---
 .../main/java/org/apache/doris/qe/ResultReceiverConsumer.java  |  7 +++++--
 .../main/java/org/apache/doris/qe/runtime/QueryProcessor.java  |  4 ++++
 .../java/org/apache/doris/qe/ResultReceiverConsumerTest.java   | 10 ++++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
index 88403083c54..54fe0dff977 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
@@ -82,15 +82,18 @@ public class ResultReceiverConsumer {
             ReceiverContext context = new 
ReceiverContext(resultReceivers.get(i), i);
             contexts.add(context);
         }
-        this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size());
+        this.readyOffsets = new ArrayBlockingQueue<>(Math.max(1, 
resultReceivers.size()));
         timeoutTs = timeoutDeadline;
     }
 
     public boolean isEos() {
-        return finishedReceivers == contexts.size();
+        return !contexts.isEmpty() && finishedReceivers == contexts.size();
     }
 
     public RowBatch getNext(Status status) throws TException, 
InterruptedException, ExecutionException, UserException {
+        if (contexts.isEmpty()) {
+            throw new UserException("There is no receiver.");
+        }
         if (!futureInitialized) {
             futureInitialized = true;
             for (ReceiverContext context : contexts) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
index 4ab7f041f59..88ceb432c2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
@@ -86,10 +86,14 @@ public class QueryProcessor extends AbstractJobProcessor {
         }
 
         boolean regenerateInstanceId = 
coordinatorContext.connectContext.consumeNeedRegenerateQueryId();
+        boolean returnResultFromLocal = 
coordinatorContext.connectContext.isReturnResultFromLocal();
         for (AssignedJob topInstance : distinctWorkerJobs.values()) {
             if (regenerateInstanceId) {
                 
topInstance.resetInstanceId(coordinatorContext.connectContext.nextInstanceId());
             }
+            if (!returnResultFromLocal) {
+                continue;
+            }
             DistributedPlanWorker topWorker = topInstance.getAssignedWorker();
             TNetworkAddress execBeAddr = new TNetworkAddress(topWorker.host(), 
topWorker.brpcPort());
             receivers.add(
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
index 3a5a9e3291a..b85830da646 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
@@ -36,6 +36,16 @@ public class ResultReceiverConsumerTest {
     private ResultReceiver receiver2 = Mockito.mock(ResultReceiver.class);
     private ResultReceiver receiver3 = Mockito.mock(ResultReceiver.class);
 
+    @Test
+    public void testEmptyReceiversForRemoteResult() throws Exception {
+        ResultReceiverConsumer consumer = new 
ResultReceiverConsumer(Lists.newArrayList(),
+                System.currentTimeMillis() + 3600);
+        Status status = new Status();
+
+        Assert.assertFalse(consumer.isEos());
+        Assertions.assertThrows(UserException.class, () -> 
consumer.getNext(status));
+    }
+
     @Test
     public void testEosHandling() throws Exception {
         ResultReceiverConsumer consumer = new ResultReceiverConsumer(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to