This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2ed306d0b16 [pick](branch-2.1) pick #44286 (#45055)
2ed306d0b16 is described below
commit 2ed306d0b160d1f6574d4e30cc6dfd12502ddc04
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Dec 6 14:33:47 2024 +0800
[pick](branch-2.1) pick #44286 (#45055)
---
.../java/org/apache/doris/qe/ConnectContext.java | 41 ++----
.../main/java/org/apache/doris/qe/Coordinator.java | 21 +--
.../arrowflight/DorisFlightSqlProducer.java | 73 +++++-----
.../arrowflight/FlightSqlConnectProcessor.java | 150 +++++++++++----------
.../results/FlightSqlEndpointsLocation.java | 65 +++++++++
5 files changed, 207 insertions(+), 143 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c41beba5a6e..99a770a7efa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
@@ -56,10 +55,10 @@ import org.apache.doris.plsql.executor.PlSqlOperation;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.task.LoadTaskInfo;
-import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
@@ -76,7 +75,6 @@ import org.json.JSONObject;
import org.xnio.StreamConnection;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -120,10 +118,7 @@ public class ConnectContext {
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
- private TNetworkAddress resultFlightServerAddr;
- private TNetworkAddress resultInternalServiceAddr;
- private ArrayList<Expr> resultOutputExprs;
- private TUniqueId finstId;
+ private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations
= Lists.newArrayList();
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
@@ -713,36 +708,16 @@ public class ConnectContext {
return runningQuery;
}
- public void setResultFlightServerAddr(TNetworkAddress
resultFlightServerAddr) {
- this.resultFlightServerAddr = resultFlightServerAddr;
+ public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation
flightSqlEndpointsLocation) {
+ this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
}
- public TNetworkAddress getResultFlightServerAddr() {
- return resultFlightServerAddr;
+ public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
+ return flightSqlEndpointsLocations;
}
- public void setResultInternalServiceAddr(TNetworkAddress
resultInternalServiceAddr) {
- this.resultInternalServiceAddr = resultInternalServiceAddr;
- }
-
- public TNetworkAddress getResultInternalServiceAddr() {
- return resultInternalServiceAddr;
- }
-
- public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
- this.resultOutputExprs = resultOutputExprs;
- }
-
- public ArrayList<Expr> getResultOutputExprs() {
- return resultOutputExprs;
- }
-
- public void setFinstId(TUniqueId finstId) {
- this.finstId = finstId;
- }
-
- public TUniqueId getFinstId() {
- return finstId;
+ public void clearFlightSqlEndpointsLocations() {
+ flightSqlEndpointsLocations.clear();
}
public void setReturnResultFromLocal(boolean returnResultFromLocal) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7fe5cb7ef19..0f5a598420d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -81,6 +81,7 @@ import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
@@ -741,16 +742,15 @@ public class Coordinator implements CoordInterface {
this.timeoutDeadline = System.currentTimeMillis() +
queryOptions.getExecutionTimeout() * 1000L;
if (topDataSink instanceof ResultSink || topDataSink instanceof
ResultFileSink) {
TNetworkAddress execBeAddr =
topParams.instanceExecParams.get(0).host;
- receiver = new ResultReceiver(queryId,
topParams.instanceExecParams.get(0).instanceId,
- addressToBackendID.get(execBeAddr),
toBrpcHost(execBeAddr), this.timeoutDeadline,
-
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
-
- if (!context.isReturnResultFromLocal()) {
+ if (context.isReturnResultFromLocal()) {
+ receiver = new ResultReceiver(queryId,
topParams.instanceExecParams.get(0).instanceId,
+ addressToBackendID.get(execBeAddr),
toBrpcHost(execBeAddr), this.timeoutDeadline,
+
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
+ } else {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
-
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
-
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
- context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
-
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
+ TUniqueId finstId =
topParams.instanceExecParams.get(0).instanceId;
+ context.addFlightSqlEndpointsLocation(new
FlightSqlEndpointsLocation(finstId,
+ toArrowFlightHost(execBeAddr), toBrpcHost(execBeAddr),
fragments.get(0).getOutputExprs()));
}
LOG.info("dispatch result sink of query {} to {}",
DebugUtil.printId(queryId),
@@ -761,7 +761,8 @@ public class Coordinator implements CoordInterface {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink)
topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
- .getBroker(topResultFileSink.getBrokerName(),
execBeAddr.getHostname());
+ .getBroker(topResultFileSink.getBrokerName(),
+
topParams.instanceExecParams.get(0).host.getHostname());
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 758f30469bf..b968ab04c57 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -25,11 +25,13 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@@ -187,6 +189,7 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no
getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
+ connectContext.clearFlightSqlEndpointsLocations();
try (FlightSqlConnectProcessor flightSQLConnectProcessor = new
FlightSqlConnectProcessor(connectContext)) {
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
@@ -225,46 +228,52 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
}
} else {
// Now only query stmt will pull results from BE.
- Schema schema =
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
- if (schema == null) {
+ flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+ if (flightSQLConnectProcessor.getArrowSchema() == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow
flight schema is null")
.toRuntimeException();
}
- TUniqueId queryId = connectContext.getFinstId();
- // Ticket contains the IP and Brpc Port of the Doris BE
node where the query result is located.
- final ByteString handle = ByteString.copyFromUtf8(
- DebugUtil.printId(queryId) + "&" +
connectContext.getResultInternalServiceAddr().hostname
- + "&" +
connectContext.getResultInternalServiceAddr().port + "&" + query);
- TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
- .build();
- Ticket ticket = new
Ticket(Any.pack(ticketStatement).toByteArray());
- // TODO Support multiple endpoints.
- Location location;
- if
(flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
- // In a production environment, it is often
inconvenient to expose Doris BE nodes
- // to the external network.
- // However, a reverse proxy (such as nginx) can be
added to all Doris BE nodes,
- // and the external client will be randomly routed to
a Doris BE node when connecting to nginx.
- // The query results of Arrow Flight SQL will be
randomly saved on a Doris BE node.
- // If it is different from the Doris BE node randomly
routed by nginx,
- // data forwarding needs to be done inside the Doris
BE node.
- if
(flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
- location = Location.forGrpcInsecure(
-
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-
flightSQLConnectProcessor.getPublicAccessAddr().port);
+ List<FlightEndpoint> endpoints = Lists.newArrayList();
+ for (FlightSqlEndpointsLocation endpointLoc :
connectContext.getFlightSqlEndpointsLocations()) {
+ TUniqueId tid = endpointLoc.getFinstId();
+ // Ticket contains the IP and Brpc Port of the Doris
BE node where the query result is located.
+ final ByteString handle = ByteString.copyFromUtf8(
+ DebugUtil.printId(tid) + "&" +
endpointLoc.getResultInternalServiceAddr().hostname + "&"
+ +
endpointLoc.getResultInternalServiceAddr().port + "&" + query);
+ TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder()
+ .setStatementHandle(handle).build();
+ Ticket ticket = new
Ticket(Any.pack(ticketStatement).toByteArray());
+ Location location;
+ if
(endpointLoc.getResultPublicAccessAddr().isSetHostname()) {
+ // In a production environment, it is often
inconvenient to expose Doris BE nodes
+ // to the external network.
+ // However, a reverse proxy (such as nginx) can be
added to all Doris BE nodes,
+ // and the external client will be randomly routed
to a Doris BE node when connecting
+ // to nginx.
+ // The query results of Arrow Flight SQL will be
randomly saved on a Doris BE node.
+ // If it is different from the Doris BE node
randomly routed by nginx,
+ // data forwarding needs to be done inside the
Doris BE node.
+ if
(endpointLoc.getResultPublicAccessAddr().isSetPort()) {
+ location =
Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
+
endpointLoc.getResultPublicAccessAddr().port);
+ } else {
+ location =
Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
+
endpointLoc.getResultFlightServerAddr().port);
+ }
} else {
- location = Location.forGrpcInsecure(
-
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-
connectContext.getResultFlightServerAddr().port);
+ location =
Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
+
endpointLoc.getResultFlightServerAddr().port);
}
- } else {
- location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
-
connectContext.getResultFlightServerAddr().port);
+ // By default, the query results of all BE nodes will
be aggregated to one BE node.
+ // ADBC Client will only receive one endpoint and pull
data from the BE node
+ // corresponding to this endpoint.
+ // `set global enable_parallel_result_sink=true;` to
allow each BE to return query results
+ // separately. ADBC Client will receive multiple
endpoints and pull data from each endpoint.
+ endpoints.add(new FlightEndpoint(ticket, location));
}
- List<FlightEndpoint> endpoints =
Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will
not callback.
- return new FlightInfo(schema, descriptor, endpoints, -1,
-1);
+ return new
FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints,
-1, -1);
}
}
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 75deda2a7fd..1283cb66ac8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -31,6 +31,7 @@ import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
@@ -58,7 +59,7 @@ import java.util.concurrent.TimeoutException;
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements
AutoCloseable {
private static final Logger LOG =
LogManager.getLogger(FlightSqlConnectProcessor.class);
- private TNetworkAddress publicAccessAddr = new TNetworkAddress();
+ private Schema arrowSchema;
public FlightSqlConnectProcessor(ConnectContext context) {
super(context);
@@ -67,8 +68,8 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
context.setReturnResultFromLocal(true);
}
- public TNetworkAddress getPublicAccessAddr() {
- return publicAccessAddr;
+ public Schema getArrowSchema() {
+ return arrowSchema;
}
public void prepare(MysqlCommand command) {
@@ -107,74 +108,87 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
// handleFieldList(tableName);
// }
- public Schema fetchArrowFlightSchema(int timeoutMs) {
- TNetworkAddress address = ctx.getResultInternalServiceAddr();
- TUniqueId tid = ctx.getFinstId();
- ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
- Types.PUniqueId finstId =
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
- try {
- InternalService.PFetchArrowFlightSchemaRequest request =
- InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
- .setFinstId(finstId)
- .build();
-
- Future<InternalService.PFetchArrowFlightSchemaResult> future
- =
BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request);
- InternalService.PFetchArrowFlightSchemaResult pResult;
- pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
- if (pResult == null) {
- throw new RuntimeException(String.format("fetch arrow flight
schema timeout, finstId: %s",
- DebugUtil.printId(tid)));
- }
- Status resultStatus = new Status(pResult.getStatus());
- if (resultStatus.getErrorCode() != TStatusCode.OK) {
- throw new RuntimeException(String.format("fetch arrow flight
schema failed, queryId: %s, errmsg: %s",
- DebugUtil.printId(tid), resultStatus));
- }
- if (pResult.hasBeArrowFlightIp()) {
-
publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
- }
- if (pResult.hasBeArrowFlightPort()) {
- publicAccessAddr.setPort(pResult.getBeArrowFlightPort());
- }
- if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
- RootAllocator rootAllocator = new
RootAllocator(Integer.MAX_VALUE);
- ArrowStreamReader arrowStreamReader = new ArrowStreamReader(
- new
ByteArrayInputStream(pResult.getSchema().toByteArray()),
- rootAllocator
- );
- try {
- VectorSchemaRoot root =
arrowStreamReader.getVectorSchemaRoot();
- List<FieldVector> fieldVectors = root.getFieldVectors();
- if (fieldVectors.size() != resultOutputExprs.size()) {
- throw new RuntimeException(String.format(
- "Schema size %s' is not equal to arrow field
size %s, finstId: %s.",
- fieldVectors.size(), resultOutputExprs.size(),
DebugUtil.printId(tid)));
+ public void fetchArrowFlightSchema(int timeoutMs) {
+ if (ctx.getFlightSqlEndpointsLocations().isEmpty()) {
+ throw new RuntimeException("fetch arrow flight schema failed, no
FlightSqlEndpointsLocations.");
+ }
+ for (FlightSqlEndpointsLocation endpointLoc :
ctx.getFlightSqlEndpointsLocations()) {
+ TNetworkAddress address =
endpointLoc.getResultInternalServiceAddr();
+ TUniqueId tid = endpointLoc.getFinstId();
+ ArrayList<Expr> resultOutputExprs =
endpointLoc.getResultOutputExprs();
+ Types.PUniqueId queryId =
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
+ try {
+ InternalService.PFetchArrowFlightSchemaRequest request
+ =
InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build();
+
+ Future<InternalService.PFetchArrowFlightSchemaResult> future =
BackendServiceProxy.getInstance()
+ .fetchArrowFlightSchema(address, request);
+ InternalService.PFetchArrowFlightSchemaResult pResult;
+ pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
+ if (pResult == null) {
+ throw new RuntimeException(
+ String.format("fetch arrow flight schema timeout,
queryId: %s", DebugUtil.printId(tid)));
+ }
+ Status resultStatus = new Status(pResult.getStatus());
+ if (resultStatus.getErrorCode() != TStatusCode.OK) {
+ throw new RuntimeException(
+ String.format("fetch arrow flight schema failed,
queryId: %s, errmsg: %s",
+ DebugUtil.printId(tid), resultStatus));
+ }
+
+ TNetworkAddress resultPublicAccessAddr = new TNetworkAddress();
+ if (pResult.hasBeArrowFlightIp()) {
+
resultPublicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
+ }
+ if (pResult.hasBeArrowFlightPort()) {
+
resultPublicAccessAddr.setPort(pResult.getBeArrowFlightPort());
+ }
+ endpointLoc.setResultPublicAccessAddr(resultPublicAccessAddr);
+ if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
+ RootAllocator rootAllocator = new
RootAllocator(Integer.MAX_VALUE);
+ ArrowStreamReader arrowStreamReader = new
ArrowStreamReader(
+ new
ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator);
+ try {
+ Schema schema;
+ VectorSchemaRoot root =
arrowStreamReader.getVectorSchemaRoot();
+ List<FieldVector> fieldVectors =
root.getFieldVectors();
+ if (fieldVectors.size() != resultOutputExprs.size()) {
+ throw new RuntimeException(
+ String.format("Schema size %s' is not
equal to arrow field size %s, queryId: %s.",
+ fieldVectors.size(),
resultOutputExprs.size(), DebugUtil.printId(tid)));
+ }
+ schema = root.getSchema();
+ if (arrowSchema == null) {
+ arrowSchema = schema;
+ } else if (!arrowSchema.equals(schema)) {
+ throw new RuntimeException(String.format(
+ "The schema returned by results BE is
different, first schema: %s, "
+ + "new schema: %s, queryId:
%s,backend: %s", arrowSchema, schema,
+ DebugUtil.printId(tid), address));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Read Arrow Flight Schema
failed.", e);
}
- return root.getSchema();
- } catch (Exception e) {
- throw new RuntimeException("Read Arrow Flight Schema
failed.", e);
+ } else {
+ throw new RuntimeException(
+ String.format("get empty arrow flight schema,
queryId: %s", DebugUtil.printId(tid)));
}
- } else {
- throw new RuntimeException(String.format("get empty arrow
flight schema, finstId: %s",
- DebugUtil.printId(tid)));
+ } catch (RpcException e) {
+ throw new RuntimeException(
+ String.format("arrow flight schema fetch catch rpc
exception, queryId: %s,backend: %s",
+ DebugUtil.printId(tid), address), e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ String.format("arrow flight schema future get
interrupted exception, queryId: %s,backend: %s",
+ DebugUtil.printId(tid), address), e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(
+ String.format("arrow flight schema future get
execution exception, queryId: %s,backend: %s",
+ DebugUtil.printId(tid), address), e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(String.format("arrow flight schema
fetch timeout, queryId: %s,backend: %s",
+ DebugUtil.printId(tid), address), e);
}
- } catch (RpcException e) {
- throw new RuntimeException(String.format(
- "arrow flight schema fetch catch rpc exception, finstId:
%s,backend: %s",
- DebugUtil.printId(tid), address), e);
- } catch (InterruptedException e) {
- throw new RuntimeException(String.format(
- "arrow flight schema future get interrupted exception,
finstId: %s,backend: %s",
- DebugUtil.printId(tid), address), e);
- } catch (ExecutionException e) {
- throw new RuntimeException(String.format(
- "arrow flight schema future get execution exception,
finstId: %s,backend: %s",
- DebugUtil.printId(tid), address), e);
- } catch (TimeoutException e) {
- throw new RuntimeException(String.format(
- "arrow flight schema fetch timeout, finstId: %s,backend:
%s",
- DebugUtil.printId(tid), address), e);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
new file mode 100644
index 00000000000..61adc797cc5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.results;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TUniqueId;
+
+import java.util.ArrayList;
+
+public class FlightSqlEndpointsLocation {
+ private TUniqueId finstId;
+ private TNetworkAddress resultFlightServerAddr;
+ private TNetworkAddress resultInternalServiceAddr;
+ private TNetworkAddress resultPublicAccessAddr;
+ private ArrayList<Expr> resultOutputExprs;
+
+ public FlightSqlEndpointsLocation(TUniqueId finstId, TNetworkAddress
resultFlightServerAddr,
+ TNetworkAddress resultInternalServiceAddr, ArrayList<Expr>
resultOutputExprs) {
+ this.finstId = finstId;
+ this.resultFlightServerAddr = resultFlightServerAddr;
+ this.resultInternalServiceAddr = resultInternalServiceAddr;
+ this.resultPublicAccessAddr = new TNetworkAddress();
+ this.resultOutputExprs = resultOutputExprs;
+ }
+
+ public TUniqueId getFinstId() {
+ return finstId;
+ }
+
+ public TNetworkAddress getResultFlightServerAddr() {
+ return resultFlightServerAddr;
+ }
+
+ public TNetworkAddress getResultInternalServiceAddr() {
+ return resultInternalServiceAddr;
+ }
+
+ public void setResultPublicAccessAddr(TNetworkAddress
resultPublicAccessAddr) {
+ this.resultPublicAccessAddr = resultPublicAccessAddr;
+ }
+
+ public TNetworkAddress getResultPublicAccessAddr() {
+ return resultPublicAccessAddr;
+ }
+
+ public ArrayList<Expr> getResultOutputExprs() {
+ return resultOutputExprs;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]