This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 4991f2d6e90 branch-4.1: [fix](fe) Fix remote Flight SQL result 
receiver initialization #63136 (#63155)
4991f2d6e90 is described below

commit 4991f2d6e90e9f45e0b0423bad84dbf1aefc01cb
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 12 22:13:48 2026 +0800

    branch-4.1: [fix](fe) Fix remote Flight SQL result receiver initialization 
#63136 (#63155)
    
    Cherry-picked from #63136
    
    Co-authored-by: Pxl <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 .../main/java/org/apache/doris/qe/ResultReceiverConsumer.java  |  7 +++++--
 .../main/java/org/apache/doris/qe/runtime/QueryProcessor.java  |  4 ++++
 .../java/org/apache/doris/qe/ResultReceiverConsumerTest.java   | 10 ++++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
index 88403083c54..54fe0dff977 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
@@ -82,15 +82,18 @@ public class ResultReceiverConsumer {
             ReceiverContext context = new 
ReceiverContext(resultReceivers.get(i), i);
             contexts.add(context);
         }
-        this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size());
+        this.readyOffsets = new ArrayBlockingQueue<>(Math.max(1, 
resultReceivers.size()));
         timeoutTs = timeoutDeadline;
     }
 
     public boolean isEos() {
-        return finishedReceivers == contexts.size();
+        return !contexts.isEmpty() && finishedReceivers == contexts.size();
     }
 
     public RowBatch getNext(Status status) throws TException, 
InterruptedException, ExecutionException, UserException {
+        if (contexts.isEmpty()) {
+            throw new UserException("There is no receiver.");
+        }
         if (!futureInitialized) {
             futureInitialized = true;
             for (ReceiverContext context : contexts) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
index 4ab7f041f59..88ceb432c2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
@@ -86,10 +86,14 @@ public class QueryProcessor extends AbstractJobProcessor {
         }
 
         boolean regenerateInstanceId = 
coordinatorContext.connectContext.consumeNeedRegenerateQueryId();
+        boolean returnResultFromLocal = 
coordinatorContext.connectContext.isReturnResultFromLocal();
         for (AssignedJob topInstance : distinctWorkerJobs.values()) {
             if (regenerateInstanceId) {
                 
topInstance.resetInstanceId(coordinatorContext.connectContext.nextInstanceId());
             }
+            if (!returnResultFromLocal) {
+                continue;
+            }
             DistributedPlanWorker topWorker = topInstance.getAssignedWorker();
             TNetworkAddress execBeAddr = new TNetworkAddress(topWorker.host(), 
topWorker.brpcPort());
             receivers.add(
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
index e67069209a0..3f76d4227b0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
@@ -40,6 +40,16 @@ public class ResultReceiverConsumerTest {
     @Injectable
     private ResultReceiver receiver3;
 
+    @Test
+    public void testEmptyReceiversForRemoteResult() throws Exception {
+        ResultReceiverConsumer consumer = new 
ResultReceiverConsumer(Lists.newArrayList(),
+                System.currentTimeMillis() + 3600);
+        Status status = new Status();
+
+        Assert.assertFalse(consumer.isEos());
+        Assertions.assertThrows(UserException.class, () -> 
consumer.getNext(status));
+    }
+
     @Test
     public void testEosHandling() throws Exception {
         ResultReceiverConsumer consumer = new ResultReceiverConsumer(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to