This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 2ed306d0b16 [pick](branch-2.1) pick #44286  (#45055)
2ed306d0b16 is described below

commit 2ed306d0b160d1f6574d4e30cc6dfd12502ddc04
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Dec 6 14:33:47 2024 +0800

    [pick](branch-2.1) pick #44286  (#45055)
---
 .../java/org/apache/doris/qe/ConnectContext.java   |  41 ++----
 .../main/java/org/apache/doris/qe/Coordinator.java |  21 +--
 .../arrowflight/DorisFlightSqlProducer.java        |  73 +++++-----
 .../arrowflight/FlightSqlConnectProcessor.java     | 150 +++++++++++----------
 .../results/FlightSqlEndpointsLocation.java        |  65 +++++++++
 5 files changed, 207 insertions(+), 143 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c41beba5a6e..99a770a7efa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.BoolLiteral;
 import org.apache.doris.analysis.DecimalLiteral;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FloatLiteral;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LiteralExpr;
@@ -56,10 +55,10 @@ import org.apache.doris.plsql.executor.PlSqlOperation;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.Histogram;
 import org.apache.doris.task.LoadTaskInfo;
-import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResultSinkType;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionEntry;
@@ -76,7 +75,6 @@ import org.json.JSONObject;
 import org.xnio.StreamConnection;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -120,10 +118,7 @@ public class ConnectContext {
     protected volatile String peerIdentity;
     private final Map<String, String> preparedQuerys = new HashMap<>();
     private String runningQuery;
-    private TNetworkAddress resultFlightServerAddr;
-    private TNetworkAddress resultInternalServiceAddr;
-    private ArrayList<Expr> resultOutputExprs;
-    private TUniqueId finstId;
+    private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations 
= Lists.newArrayList();
     private boolean returnResultFromLocal = true;
     // mysql net
     protected volatile MysqlChannel mysqlChannel;
@@ -713,36 +708,16 @@ public class ConnectContext {
         return runningQuery;
     }
 
-    public void setResultFlightServerAddr(TNetworkAddress 
resultFlightServerAddr) {
-        this.resultFlightServerAddr = resultFlightServerAddr;
+    public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation 
flightSqlEndpointsLocation) {
+        this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
     }
 
-    public TNetworkAddress getResultFlightServerAddr() {
-        return resultFlightServerAddr;
+    public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
+        return flightSqlEndpointsLocations;
     }
 
-    public void setResultInternalServiceAddr(TNetworkAddress 
resultInternalServiceAddr) {
-        this.resultInternalServiceAddr = resultInternalServiceAddr;
-    }
-
-    public TNetworkAddress getResultInternalServiceAddr() {
-        return resultInternalServiceAddr;
-    }
-
-    public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
-        this.resultOutputExprs = resultOutputExprs;
-    }
-
-    public ArrayList<Expr> getResultOutputExprs() {
-        return resultOutputExprs;
-    }
-
-    public void setFinstId(TUniqueId finstId) {
-        this.finstId = finstId;
-    }
-
-    public TUniqueId getFinstId() {
-        return finstId;
+    public void clearFlightSqlEndpointsLocations() {
+        flightSqlEndpointsLocations.clear();
     }
 
     public void setReturnResultFromLocal(boolean returnResultFromLocal) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7fe5cb7ef19..0f5a598420d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -81,6 +81,7 @@ import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.LoadEtlTask;
@@ -741,16 +742,15 @@ public class Coordinator implements CoordInterface {
         this.timeoutDeadline = System.currentTimeMillis() + 
queryOptions.getExecutionTimeout() * 1000L;
         if (topDataSink instanceof ResultSink || topDataSink instanceof 
ResultFileSink) {
             TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
-            receiver = new ResultReceiver(queryId, 
topParams.instanceExecParams.get(0).instanceId,
-                    addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline,
-                    
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
-
-            if (!context.isReturnResultFromLocal()) {
+            if (context.isReturnResultFromLocal()) {
+                receiver = new ResultReceiver(queryId, 
topParams.instanceExecParams.get(0).instanceId,
+                        addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline,
+                        
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
+            } else {
                 
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
-                
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
-                
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
-                context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
-                
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
+                TUniqueId finstId = 
topParams.instanceExecParams.get(0).instanceId;
+                context.addFlightSqlEndpointsLocation(new 
FlightSqlEndpointsLocation(finstId,
+                        toArrowFlightHost(execBeAddr), toBrpcHost(execBeAddr), 
fragments.get(0).getOutputExprs()));
             }
 
             LOG.info("dispatch result sink of query {} to {}", 
DebugUtil.printId(queryId),
@@ -761,7 +761,8 @@ public class Coordinator implements CoordInterface {
                 // set the broker address for OUTFILE sink
                 ResultFileSink topResultFileSink = (ResultFileSink) 
topDataSink;
                 FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
-                        .getBroker(topResultFileSink.getBrokerName(), 
execBeAddr.getHostname());
+                        .getBroker(topResultFileSink.getBrokerName(),
+                                
topParams.instanceExecParams.get(0).host.getHostname());
                 topResultFileSink.setBrokerAddr(broker.host, broker.port);
             }
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 758f30469bf..b968ab04c57 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -25,11 +25,13 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
 import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -187,6 +189,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             Preconditions.checkState(!query.isEmpty());
             // After the previous query was executed, there was no 
getStreamStatement to take away the result.
             connectContext.getFlightSqlChannel().reset();
+            connectContext.clearFlightSqlEndpointsLocations();
             try (FlightSqlConnectProcessor flightSQLConnectProcessor = new 
FlightSqlConnectProcessor(connectContext)) {
                 flightSQLConnectProcessor.handleQuery(query);
                 if (connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
@@ -225,46 +228,52 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                     }
                 } else {
                     // Now only query stmt will pull results from BE.
-                    Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
-                    if (schema == null) {
+                    flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+                    if (flightSQLConnectProcessor.getArrowSchema() == null) {
                         throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null")
                                 .toRuntimeException();
                     }
 
-                    TUniqueId queryId = connectContext.getFinstId();
-                    // Ticket contains the IP and Brpc Port of the Doris BE 
node where the query result is located.
-                    final ByteString handle = ByteString.copyFromUtf8(
-                            DebugUtil.printId(queryId) + "&" + 
connectContext.getResultInternalServiceAddr().hostname
-                                    + "&" + 
connectContext.getResultInternalServiceAddr().port + "&" + query);
-                    TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
-                            .build();
-                    Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
-                    // TODO Support multiple endpoints.
-                    Location location;
-                    if 
(flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
-                        // In a production environment, it is often 
inconvenient to expose Doris BE nodes
-                        // to the external network.
-                        // However, a reverse proxy (such as nginx) can be 
added to all Doris BE nodes,
-                        // and the external client will be randomly routed to 
a Doris BE node when connecting to nginx.
-                        // The query results of Arrow Flight SQL will be 
randomly saved on a Doris BE node.
-                        // If it is different from the Doris BE node randomly 
routed by nginx,
-                        // data forwarding needs to be done inside the Doris 
BE node.
-                        if 
(flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
-                            location = Location.forGrpcInsecure(
-                                    
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-                                    
flightSQLConnectProcessor.getPublicAccessAddr().port);
+                    List<FlightEndpoint> endpoints = Lists.newArrayList();
+                    for (FlightSqlEndpointsLocation endpointLoc : 
connectContext.getFlightSqlEndpointsLocations()) {
+                        TUniqueId tid = endpointLoc.getFinstId();
+                        // Ticket contains the IP and Brpc Port of the Doris 
BE node where the query result is located.
+                        final ByteString handle = ByteString.copyFromUtf8(
+                                DebugUtil.printId(tid) + "&" + 
endpointLoc.getResultInternalServiceAddr().hostname + "&"
+                                        + 
endpointLoc.getResultInternalServiceAddr().port + "&" + query);
+                        TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder()
+                                .setStatementHandle(handle).build();
+                        Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
+                        Location location;
+                        if 
(endpointLoc.getResultPublicAccessAddr().isSetHostname()) {
+                            // In a production environment, it is often 
inconvenient to expose Doris BE nodes
+                            // to the external network.
+                            // However, a reverse proxy (such as nginx) can be 
added to all Doris BE nodes,
+                            // and the external client will be randomly routed 
to a Doris BE node when connecting
+                            // to nginx.
+                            // The query results of Arrow Flight SQL will be 
randomly saved on a Doris BE node.
+                            // If it is different from the Doris BE node 
randomly routed by nginx,
+                            // data forwarding needs to be done inside the 
Doris BE node.
+                            if 
(endpointLoc.getResultPublicAccessAddr().isSetPort()) {
+                                location = 
Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
+                                        
endpointLoc.getResultPublicAccessAddr().port);
+                            } else {
+                                location = 
Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
+                                        
endpointLoc.getResultFlightServerAddr().port);
+                            }
                         } else {
-                            location = Location.forGrpcInsecure(
-                                    
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-                                    
connectContext.getResultFlightServerAddr().port);
+                            location = 
Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
+                                    
endpointLoc.getResultFlightServerAddr().port);
                         }
-                    } else {
-                        location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
-                                
connectContext.getResultFlightServerAddr().port);
+                        // By default, the query results of all BE nodes will 
be aggregated to one BE node.
+                        // ADBC Client will only receive one endpoint and pull 
data from the BE node
+                        // corresponding to this endpoint.
+                        // `set global enable_parallel_result_sink=true;` to 
allow each BE to return query results
+                        // separately. ADBC Client will receive multiple 
endpoints and pull data from each endpoint.
+                        endpoints.add(new FlightEndpoint(ticket, location));
                     }
-                    List<FlightEndpoint> endpoints = 
Collections.singletonList(new FlightEndpoint(ticket, location));
                     // TODO Set in BE callback after query end, Client will 
not callback.
-                    return new FlightInfo(schema, descriptor, endpoints, -1, 
-1);
+                    return new 
FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, 
-1, -1);
                 }
             }
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 75deda2a7fd..1283cb66ac8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -31,6 +31,7 @@ import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
@@ -58,7 +59,7 @@ import java.util.concurrent.TimeoutException;
  */
 public class FlightSqlConnectProcessor extends ConnectProcessor implements 
AutoCloseable {
     private static final Logger LOG = 
LogManager.getLogger(FlightSqlConnectProcessor.class);
-    private TNetworkAddress publicAccessAddr = new TNetworkAddress();
+    private Schema arrowSchema;
 
     public FlightSqlConnectProcessor(ConnectContext context) {
         super(context);
@@ -67,8 +68,8 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
         context.setReturnResultFromLocal(true);
     }
 
-    public TNetworkAddress getPublicAccessAddr() {
-        return publicAccessAddr;
+    public Schema getArrowSchema() {
+        return arrowSchema;
     }
 
     public void prepare(MysqlCommand command) {
@@ -107,74 +108,87 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
     //     handleFieldList(tableName);
     // }
 
-    public Schema fetchArrowFlightSchema(int timeoutMs) {
-        TNetworkAddress address = ctx.getResultInternalServiceAddr();
-        TUniqueId tid = ctx.getFinstId();
-        ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
-        Types.PUniqueId finstId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
-        try {
-            InternalService.PFetchArrowFlightSchemaRequest request =
-                    InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
-                            .setFinstId(finstId)
-                            .build();
-
-            Future<InternalService.PFetchArrowFlightSchemaResult> future
-                    = 
BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request);
-            InternalService.PFetchArrowFlightSchemaResult pResult;
-            pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
-            if (pResult == null) {
-                throw new RuntimeException(String.format("fetch arrow flight 
schema timeout, finstId: %s",
-                        DebugUtil.printId(tid)));
-            }
-            Status resultStatus = new Status(pResult.getStatus());
-            if (resultStatus.getErrorCode() != TStatusCode.OK) {
-                throw new RuntimeException(String.format("fetch arrow flight 
schema failed, queryId: %s, errmsg: %s",
-                        DebugUtil.printId(tid), resultStatus));
-            }
-            if (pResult.hasBeArrowFlightIp()) {
-                
publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
-            }
-            if (pResult.hasBeArrowFlightPort()) {
-                publicAccessAddr.setPort(pResult.getBeArrowFlightPort());
-            }
-            if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
-                RootAllocator rootAllocator = new 
RootAllocator(Integer.MAX_VALUE);
-                ArrowStreamReader arrowStreamReader = new ArrowStreamReader(
-                        new 
ByteArrayInputStream(pResult.getSchema().toByteArray()),
-                        rootAllocator
-                );
-                try {
-                    VectorSchemaRoot root = 
arrowStreamReader.getVectorSchemaRoot();
-                    List<FieldVector> fieldVectors = root.getFieldVectors();
-                    if (fieldVectors.size() != resultOutputExprs.size()) {
-                        throw new RuntimeException(String.format(
-                                "Schema size %s' is not equal to arrow field 
size %s, finstId: %s.",
-                                fieldVectors.size(), resultOutputExprs.size(), 
DebugUtil.printId(tid)));
+    public void fetchArrowFlightSchema(int timeoutMs) {
+        if (ctx.getFlightSqlEndpointsLocations().isEmpty()) {
+            throw new RuntimeException("fetch arrow flight schema failed, no 
FlightSqlEndpointsLocations.");
+        }
+        for (FlightSqlEndpointsLocation endpointLoc : 
ctx.getFlightSqlEndpointsLocations()) {
+            TNetworkAddress address = 
endpointLoc.getResultInternalServiceAddr();
+            TUniqueId tid = endpointLoc.getFinstId();
+            ArrayList<Expr> resultOutputExprs = 
endpointLoc.getResultOutputExprs();
+            Types.PUniqueId queryId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
+            try {
+                InternalService.PFetchArrowFlightSchemaRequest request
+                        = 
InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build();
+
+                Future<InternalService.PFetchArrowFlightSchemaResult> future = 
BackendServiceProxy.getInstance()
+                        .fetchArrowFlightSchema(address, request);
+                InternalService.PFetchArrowFlightSchemaResult pResult;
+                pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
+                if (pResult == null) {
+                    throw new RuntimeException(
+                            String.format("fetch arrow flight schema timeout, 
queryId: %s", DebugUtil.printId(tid)));
+                }
+                Status resultStatus = new Status(pResult.getStatus());
+                if (resultStatus.getErrorCode() != TStatusCode.OK) {
+                    throw new RuntimeException(
+                            String.format("fetch arrow flight schema failed, 
queryId: %s, errmsg: %s",
+                                    DebugUtil.printId(tid), resultStatus));
+                }
+
+                TNetworkAddress resultPublicAccessAddr = new TNetworkAddress();
+                if (pResult.hasBeArrowFlightIp()) {
+                    
resultPublicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
+                }
+                if (pResult.hasBeArrowFlightPort()) {
+                    
resultPublicAccessAddr.setPort(pResult.getBeArrowFlightPort());
+                }
+                endpointLoc.setResultPublicAccessAddr(resultPublicAccessAddr);
+                if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
+                    RootAllocator rootAllocator = new 
RootAllocator(Integer.MAX_VALUE);
+                    ArrowStreamReader arrowStreamReader = new 
ArrowStreamReader(
+                            new 
ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator);
+                    try {
+                        Schema schema;
+                        VectorSchemaRoot root = 
arrowStreamReader.getVectorSchemaRoot();
+                        List<FieldVector> fieldVectors = 
root.getFieldVectors();
+                        if (fieldVectors.size() != resultOutputExprs.size()) {
+                            throw new RuntimeException(
+                                    String.format("Schema size %s' is not 
equal to arrow field size %s, queryId: %s.",
+                                            fieldVectors.size(), 
resultOutputExprs.size(), DebugUtil.printId(tid)));
+                        }
+                        schema = root.getSchema();
+                        if (arrowSchema == null) {
+                            arrowSchema = schema;
+                        } else if (!arrowSchema.equals(schema)) {
+                            throw new RuntimeException(String.format(
+                                    "The schema returned by results BE is 
different, first schema: %s, "
+                                            + "new schema: %s, queryId: 
%s,backend: %s", arrowSchema, schema,
+                                    DebugUtil.printId(tid), address));
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException("Read Arrow Flight Schema 
failed.", e);
                     }
-                    return root.getSchema();
-                } catch (Exception e) {
-                    throw new RuntimeException("Read Arrow Flight Schema 
failed.", e);
+                } else {
+                    throw new RuntimeException(
+                            String.format("get empty arrow flight schema, 
queryId: %s", DebugUtil.printId(tid)));
                 }
-            } else {
-                throw new RuntimeException(String.format("get empty arrow 
flight schema, finstId: %s",
-                        DebugUtil.printId(tid)));
+            } catch (RpcException e) {
+                throw new RuntimeException(
+                        String.format("arrow flight schema fetch catch rpc 
exception, queryId: %s,backend: %s",
+                                DebugUtil.printId(tid), address), e);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(
+                        String.format("arrow flight schema future get 
interrupted exception, queryId: %s,backend: %s",
+                                DebugUtil.printId(tid), address), e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(
+                        String.format("arrow flight schema future get 
execution exception, queryId: %s,backend: %s",
+                                DebugUtil.printId(tid), address), e);
+            } catch (TimeoutException e) {
+                throw new RuntimeException(String.format("arrow flight schema 
fetch timeout, queryId: %s,backend: %s",
+                        DebugUtil.printId(tid), address), e);
             }
-        } catch (RpcException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema fetch catch rpc exception, finstId: 
%s,backend: %s",
-                    DebugUtil.printId(tid), address), e);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema future get interrupted exception, 
finstId: %s,backend: %s",
-                    DebugUtil.printId(tid), address), e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema future get execution exception, 
finstId: %s,backend: %s",
-                    DebugUtil.printId(tid), address), e);
-        } catch (TimeoutException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema fetch timeout, finstId: %s,backend: 
%s",
-                    DebugUtil.printId(tid), address), e);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
new file mode 100644
index 00000000000..61adc797cc5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.results;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TUniqueId;
+
+import java.util.ArrayList;
+
+public class FlightSqlEndpointsLocation {
+    private TUniqueId finstId;
+    private TNetworkAddress resultFlightServerAddr;
+    private TNetworkAddress resultInternalServiceAddr;
+    private TNetworkAddress resultPublicAccessAddr;
+    private ArrayList<Expr> resultOutputExprs;
+
+    public FlightSqlEndpointsLocation(TUniqueId finstId, TNetworkAddress 
resultFlightServerAddr,
+            TNetworkAddress resultInternalServiceAddr, ArrayList<Expr> 
resultOutputExprs) {
+        this.finstId = finstId;
+        this.resultFlightServerAddr = resultFlightServerAddr;
+        this.resultInternalServiceAddr = resultInternalServiceAddr;
+        this.resultPublicAccessAddr = new TNetworkAddress();
+        this.resultOutputExprs = resultOutputExprs;
+    }
+
+    public TUniqueId getFinstId() {
+        return finstId;
+    }
+
+    public TNetworkAddress getResultFlightServerAddr() {
+        return resultFlightServerAddr;
+    }
+
+    public TNetworkAddress getResultInternalServiceAddr() {
+        return resultInternalServiceAddr;
+    }
+
+    public void setResultPublicAccessAddr(TNetworkAddress 
resultPublicAccessAddr) {
+        this.resultPublicAccessAddr = resultPublicAccessAddr;
+    }
+
+    public TNetworkAddress getResultPublicAccessAddr() {
+        return resultPublicAccessAddr;
+    }
+
+    public ArrayList<Expr> getResultOutputExprs() {
+        return resultOutputExprs;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to