This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3d62f9632c [Feature](parallel-result-sink) support async fetch from 
multiple backends concurrently (#47915)
e3d62f9632c is described below

commit e3d62f9632c9b75c3d8cb7f331da3ef5c3964a04
Author: Pxl <[email protected]>
AuthorDate: Wed Mar 19 17:48:55 2025 +0800

    [Feature](parallel-result-sink) support async fetch from multiple backends 
concurrently (#47915)
    
    ### What problem does this PR solve?
    support async fetch from multiple backends concurrently
    If we fetch data one by one with the backend in order, the logical
    deadlock may be triggered due to memory control, and the load on the
    backend is also unbalanced.
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [x] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [x] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  24 +--
 .../java/org/apache/doris/qe/ResultReceiver.java   |  29 ++--
 .../apache/doris/qe/ResultReceiverConsumer.java    | 129 ++++++++++++++++
 .../apache/doris/qe/runtime/QueryProcessor.java    |  46 ++----
 .../org/apache/doris/rpc/BackendServiceClient.java |   3 +-
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  19 +++
 .../doris/qe/ResultReceiverConsumerTest.java       | 172 +++++++++++++++++++++
 7 files changed, 361 insertions(+), 61 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 110385ed382..6881b7ec657 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -225,6 +225,7 @@ public class Coordinator implements CoordInterface {
     private final Map<Pair<Integer, Long>, PipelineExecContext> 
pipelineExecContexts = new HashMap<>();
     private final List<PipelineExecContext> needCheckPipelineExecContexts = 
Lists.newArrayList();
     private List<ResultReceiver> receivers = Lists.newArrayList();
+    private ResultReceiverConsumer receiverConsumer;
     private final List<ScanNode> scanNodes;
     private int scanRangeNum = 0;
     // number of instances of this query, equals to
@@ -277,8 +278,6 @@ public class Coordinator implements CoordInterface {
 
     private StatsErrorEstimator statsErrorEstimator;
 
-    private int receiverOffset = 0;
-
     // A countdown latch to mark the completion of each instance.
     // use for old pipeline
     // instance id -> dummy value
@@ -749,6 +748,7 @@ public class Coordinator implements CoordInterface {
                             toArrowFlightHost(param.host), 
toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
                 }
             }
+            receiverConsumer = new ResultReceiverConsumer(receivers, 
timeoutDeadline);
 
             LOG.info("dispatch result sink of query {} to {}", 
DebugUtil.printId(queryId),
                     topParams.instanceExecParams.get(0).host);
@@ -1155,10 +1155,8 @@ public class Coordinator implements CoordInterface {
             throw new UserException("There is no receiver.");
         }
 
-        RowBatch resultBatch;
         Status status = new Status();
-        ResultReceiver receiver = receivers.get(receiverOffset);
-        resultBatch = receiver.getNext(status);
+        RowBatch resultBatch = receiverConsumer.getNext(status);
         if (!status.ok()) {
             LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
                     DebugUtil.printId(queryId), status.getErrorMsg());
@@ -1202,20 +1200,8 @@ public class Coordinator implements CoordInterface {
         boolean reachedLimit = LimitUtils.cancelIfReachLimit(
                 resultBatch, limitRows, numReceivedRows, this::cancelInternal);
 
-        if (resultBatch.isEos()) {
-            receivers.remove(receiver);
-            if (receivers.isEmpty()) {
-                returnedAllResults = true;
-            } else if (!reachedLimit) {
-                // if reachedLimit is true, which means this query has been 
cancelled.
-                // so no need to set eos to false again.
-                resultBatch.setEos(false);
-            }
-        }
-
-        if (!returnedAllResults) {
-            receiverOffset += 1;
-            receiverOffset %= receivers.size();
+        if (reachedLimit) {
+            resultBatch.setEos(true);
         }
         return resultBatch;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index f1304826e39..5efc288c159 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -29,6 +29,8 @@ import org.apache.doris.thrift.TResultBatch;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TDeserializer;
@@ -76,6 +78,22 @@ public class ResultReceiver {
         return finstId;
     }
 
+    public void createFuture(
+            FutureCallback<InternalService.PFetchDataResult> callback) throws 
RpcException {
+        InternalService.PFetchDataRequest request = 
InternalService.PFetchDataRequest.newBuilder()
+                .setFinstId(getRealFinstId())
+                .setRespInAttachment(false)
+                .build();
+        try {
+            fetchDataAsyncFuture = 
BackendServiceProxy.getInstance().fetchDataAsyncWithCallback(address, request,
+                    callback);
+        } catch (RpcException e) {
+            LOG.warn("fetch result rpc exception, finstId={}", 
DebugUtil.printId(finstId), e);
+            SimpleScheduler.addToBlacklist(backendId, e.getMessage());
+            throw e;
+        }
+    }
+
     public RowBatch getNext(Status status) throws TException {
         if (isDone) {
             return null;
@@ -83,13 +101,8 @@ public class ResultReceiver {
         final RowBatch rowBatch = new RowBatch();
         try {
             while (!isDone && runStatus.ok()) {
-                InternalService.PFetchDataRequest request = 
InternalService.PFetchDataRequest.newBuilder()
-                        .setFinstId(getRealFinstId())
-                        .setRespInAttachment(false)
-                        .build();
-
                 currentThread = Thread.currentThread();
-                fetchDataAsyncFuture = 
BackendServiceProxy.getInstance().fetchDataAsync(address, request);
+                Preconditions.checkNotNull(fetchDataAsyncFuture);
                 InternalService.PFetchDataResult pResult = null;
 
                 while (pResult == null) {
@@ -172,10 +185,6 @@ public class ResultReceiver {
                     return rowBatch;
                 }
             }
-        } catch (RpcException e) {
-            LOG.warn("fetch result rpc exception, finstId={}", 
DebugUtil.printId(finstId), e);
-            status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
-            SimpleScheduler.addToBlacklist(backendId, e.getMessage());
         } catch (ExecutionException e) {
             LOG.warn("fetch result execution exception, finstId={}", 
DebugUtil.printId(finstId), e);
             if (e.getMessage().contains("time out")) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
new file mode 100644
index 00000000000..e05417b0f5b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.rpc.RpcException;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+
+public class ResultReceiverConsumer {
+    class ReceiverContext {
+        public ReceiverContext(ResultReceiver receiver, int offset) {
+            this.receiver = receiver;
+            this.offset = offset;
+        }
+
+        public void createFuture() {
+            if (errMsg != null) {
+                return;
+            }
+            try {
+                receiver.createFuture(new 
FutureCallback<InternalService.PFetchDataResult>() {
+                    @Override
+                    public void onSuccess(InternalService.PFetchDataResult 
result) {
+                        readyOffsets.offer(offset);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        readyOffsets.offer(offset);
+                    }
+                });
+            } catch (RpcException e) {
+                setErrMsg(e.getMessage());
+                readyOffsets.offer(offset);
+            }
+        }
+
+        ResultReceiver receiver;
+        final int offset;
+    }
+
+    private List<ReceiverContext> contexts = Lists.newArrayList();
+    private boolean futureInitialized = false;
+    private String errMsg;
+    private final long timeoutTs;
+
+    void setErrMsg(String errMsg) {
+        this.errMsg = errMsg;
+    }
+
+    BlockingQueue<Integer> readyOffsets;
+    int finishedReceivers = 0;
+
+    public ResultReceiverConsumer(List<ResultReceiver> resultReceivers, long 
timeoutDeadline) {
+        for (int i = 0; i < resultReceivers.size(); i++) {
+            ReceiverContext context = new 
ReceiverContext(resultReceivers.get(i), i);
+            contexts.add(context);
+        }
+        this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size());
+        timeoutTs = timeoutDeadline;
+    }
+
+    public boolean isEos() {
+        return finishedReceivers == contexts.size();
+    }
+
+    public RowBatch getNext(Status status) throws TException, 
InterruptedException, ExecutionException, UserException {
+        if (!futureInitialized) {
+            futureInitialized = true;
+            for (ReceiverContext context : contexts) {
+                context.createFuture();
+            }
+        }
+
+        Integer offset = readyOffsets.poll(timeoutTs - 
System.currentTimeMillis(),
+                java.util.concurrent.TimeUnit.MILLISECONDS);
+        if (offset == null) {
+            throw new TException("query timeout");
+        }
+        if (errMsg != null) {
+            throw new UserException(errMsg);
+        }
+
+        ReceiverContext context = contexts.get(offset);
+        RowBatch rowBatch = context.receiver.getNext(status);
+        if (!status.ok() || rowBatch == null) {
+            return rowBatch;
+        }
+        if (rowBatch.isEos()) {
+            finishedReceivers++;
+            rowBatch.setEos(isEos());
+        } else {
+            context.createFuture();
+        }
+
+        return rowBatch;
+    }
+
+    public synchronized void cancel(Status reason) {
+        for (ReceiverContext context : contexts) {
+            context.receiver.cancel(reason);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
index 5ab31b7cf83..74b1173db6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
@@ -30,6 +30,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.CoordinatorContext;
 import org.apache.doris.qe.LimitUtils;
 import org.apache.doris.qe.ResultReceiver;
+import org.apache.doris.qe.ResultReceiverConsumer;
 import org.apache.doris.qe.RowBatch;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -44,8 +45,7 @@ import org.apache.thrift.TException;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 
 public class QueryProcessor extends AbstractJobProcessor {
     private static final Logger LOG = 
LogManager.getLogger(QueryProcessor.class);
@@ -54,15 +54,13 @@ public class QueryProcessor extends AbstractJobProcessor {
     private final long limitRows;
 
     // mutable field
-    private final List<ResultReceiver> runningReceivers;
-    private int receiverOffset;
+    private ResultReceiverConsumer receiverConsumer;
+
     private long numReceivedRows;
 
-    public QueryProcessor(CoordinatorContext coordinatorContext, 
List<ResultReceiver> runningReceivers) {
+    public QueryProcessor(CoordinatorContext coordinatorContext, 
ResultReceiverConsumer consumer) {
         super(coordinatorContext);
-        this.runningReceivers = new CopyOnWriteArrayList<>(
-                Objects.requireNonNull(runningReceivers, "runningReceivers can 
not be null")
-        );
+        receiverConsumer = consumer;
 
         this.limitRows = coordinatorContext.fragments.get(0)
                 .getPlanRoot()
@@ -101,7 +99,9 @@ public class QueryProcessor extends AbstractJobProcessor {
                     )
             );
         }
-        return new QueryProcessor(coordinatorContext, receivers);
+        ResultReceiverConsumer consumer = new ResultReceiverConsumer(receivers,
+                coordinatorContext.timeoutDeadline.get());
+        return new QueryProcessor(coordinatorContext, consumer);
     }
 
     @Override
@@ -110,13 +110,12 @@ public class QueryProcessor extends AbstractJobProcessor {
     }
 
     public boolean isEos() {
-        return runningReceivers.isEmpty();
+        return receiverConsumer.isEos();
     }
 
-    public RowBatch getNext() throws UserException, TException, RpcException {
-        ResultReceiver receiver = runningReceivers.get(receiverOffset);
+    public RowBatch getNext() throws UserException, InterruptedException, 
TException, RpcException, ExecutionException {
         Status status = new Status();
-        RowBatch resultBatch = receiver.getNext(status);
+        RowBatch resultBatch = receiverConsumer.getNext(status);
         if (!status.ok()) {
             LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
                     DebugUtil.printId(coordinatorContext.queryId), 
status.getErrorMsg());
@@ -152,25 +151,14 @@ public class QueryProcessor extends AbstractJobProcessor {
         boolean reachedLimit = LimitUtils.cancelIfReachLimit(
                 resultBatch, limitRows, numReceivedRows, 
coordinatorContext::cancelSchedule);
 
-        if (resultBatch.isEos()) {
-            runningReceivers.remove(receiver);
-            // if reachedLimit is true, which means this query has been 
cancelled.
-            // so no need to set eos to false again.
-            if (!runningReceivers.isEmpty() && !reachedLimit) {
-                resultBatch.setEos(false);
-            }
-        }
-
-        if (!runningReceivers.isEmpty()) {
-            receiverOffset = (receiverOffset + 1) % runningReceivers.size();
+        if (reachedLimit) {
+            resultBatch.setEos(true);
         }
         return resultBatch;
     }
 
     public void cancel(Status cancelReason) {
-        for (ResultReceiver receiver : runningReceivers) {
-            receiver.cancel(cancelReason);
-        }
+        receiverConsumer.cancel(cancelReason);
 
         this.executionTask.ifPresent(sqlPipelineTask -> {
             for (MultiFragmentsPipelineTask fragmentsTask : 
sqlPipelineTask.getChildrenTasks().values()) {
@@ -179,10 +167,6 @@ public class QueryProcessor extends AbstractJobProcessor {
         });
     }
 
-    public int getReceiverOffset() {
-        return receiverOffset;
-    }
-
     public long getNumReceivedRows() {
         return numReceivedRows;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 54c5e68144c..ee1d9c46036 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -89,7 +89,8 @@ public class BackendServiceClient {
                 .cancelPlanFragment(request);
     }
 
-    public Future<InternalService.PFetchDataResult> 
fetchDataAsync(InternalService.PFetchDataRequest request) {
+    public ListenableFuture<InternalService.PFetchDataResult> fetchDataAsync(
+            InternalService.PFetchDataRequest request) {
         return stub.fetchData(request);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 053a7428b52..c774ba184e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -38,6 +38,8 @@ import org.apache.doris.thrift.TPipelineFragmentParamsList;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import org.apache.logging.log4j.LogManager;
@@ -295,6 +297,23 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<InternalService.PFetchDataResult> fetchDataAsyncWithCallback(
+            TNetworkAddress address, InternalService.PFetchDataRequest request,
+            FutureCallback<InternalService.PFetchDataResult> callback) throws 
RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            ListenableFuture<InternalService.PFetchDataResult> future = 
client.fetchDataAsync(request);
+            Futures.addCallback(
+                    future, callback,
+                    grpcThreadPool);
+            return future;
+        } catch (Throwable e) {
+            LOG.warn("fetch data catch a exception, address={}:{}",
+                    address.getHostname(), address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
     public Future<InternalService.PTabletKeyLookupResponse> 
fetchTabletDataAsync(
             TNetworkAddress address, InternalService.PTabletKeyLookupRequest 
request) throws RpcException {
         try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
new file mode 100644
index 00000000000..e67069209a0
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.proto.InternalService;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import mockit.Delegate;
+import mockit.Expectations;
+import mockit.Injectable;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+public class ResultReceiverConsumerTest {
+
+    @Injectable
+    private ResultReceiver receiver1;
+    @Injectable
+    private ResultReceiver receiver2;
+    @Injectable
+    private ResultReceiver receiver3;
+
+    @Test
+    public void testEosHandling() throws Exception {
+        ResultReceiverConsumer consumer = new ResultReceiverConsumer(
+                Lists.newArrayList(receiver1, receiver2, receiver3), 
System.currentTimeMillis() + 3600);
+        Status status = new Status();
+
+        RowBatch normalBatch1 = new RowBatch();
+        normalBatch1.setEos(false);
+        RowBatch normalBatch2 = new RowBatch();
+        normalBatch2.setEos(false);
+        RowBatch normalBatch3 = new RowBatch();
+        normalBatch3.setEos(false);
+        RowBatch eosBatch1 = new RowBatch();
+        eosBatch1.setEos(true);
+        RowBatch eosBatch2 = new RowBatch();
+        eosBatch2.setEos(true);
+        RowBatch eosBatch3 = new RowBatch();
+        eosBatch3.setEos(true);
+
+        new Expectations() {
+            {
+                
receiver1.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) {
+                        callback.onSuccess(null);
+                    }
+                };
+                
receiver2.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) {
+                        callback.onSuccess(null);
+                    }
+                };
+                
receiver3.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) {
+                        callback.onSuccess(null);
+                    }
+                };
+
+                receiver1.getNext((Status) any);
+                result = normalBatch1;
+                result = eosBatch1;
+                receiver2.getNext((Status) any);
+                result = normalBatch2;
+                result = eosBatch2;
+                receiver3.getNext((Status) any);
+                result = normalBatch3;
+                result = eosBatch3;
+            }
+        };
+        for (int i = 0; i < 5; i++) {
+            RowBatch batch = consumer.getNext(status);
+            Assert.assertFalse(consumer.isEos());
+            Assert.assertFalse(batch.isEos());
+        }
+
+        RowBatch batch = consumer.getNext(status);
+        Assert.assertTrue(consumer.isEos());
+        Assert.assertTrue(batch.isEos());
+
+    }
+
+    @Test
+    public void testGetNextExceptionHandling() throws Exception {
+        ResultReceiverConsumer consumer = new ResultReceiverConsumer(
+                Lists.newArrayList(receiver1, receiver2, receiver3), 
System.currentTimeMillis() + 3600);
+        Status status = new Status();
+
+        RowBatch normalBatch1 = new RowBatch();
+        normalBatch1.setEos(false);
+
+        new Expectations() {
+            {
+                
receiver1.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) {
+                        callback.onSuccess(null);
+                    }
+                };
+                
receiver2.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) {
+                        callback.onSuccess(null);
+                    }
+                };
+                
receiver3.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) {
+                        callback.onSuccess(null);
+                    }
+                };
+
+                receiver1.getNext((Status) any);
+                result = normalBatch1;
+
+                receiver2.getNext((Status) any);
+                result = new TException("Network error");
+            }
+        };
+        RowBatch batch = consumer.getNext(status);
+        Assert.assertFalse(batch.isEos());
+        Assertions.assertThrows(TException.class, () -> 
consumer.getNext(status));
+    }
+
+    @Test
+    public void testCreateFutureExceptionHandling() throws Exception {
+        ResultReceiverConsumer consumer = new ResultReceiverConsumer(
+                Lists.newArrayList(receiver1, receiver2, receiver3), 
System.currentTimeMillis() + 3600);
+        Status status = new Status();
+
+        new Expectations() {
+            {
+                
receiver1.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) {
+                        callback.onSuccess(null);
+                    }
+                };
+                
receiver2.createFuture((FutureCallback<InternalService.PFetchDataResult>) any);
+                result = new Delegate() {
+                    void 
delegate(FutureCallback<InternalService.PFetchDataResult> callback) throws 
UserException {
+                        throw new UserException("User error");
+                    }
+                };
+            }
+        };
+        Assertions.assertThrows(UserException.class, () -> 
consumer.getNext(status));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to