This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4601838bb9 Make flight sql client generic (#8915)
4601838bb9 is described below
commit 4601838bb9b8fe4256499301fcb094e2997eb907
Author: 张林伟 <[email protected]>
AuthorDate: Fri Dec 12 03:32:39 2025 +0800
Make flight sql client generic (#8915)
# Which issue does this PR close?
None.
# Rationale for this change
We may not directly use `Channel` (maybe a wrapper of `Channel`), make
flight sql client more generic to receive any type which implements
GrpcService trait.
# What changes are included in this PR?
Change `Channel` to generic type.
# Are these changes tested?
CI.
# Are there any user-facing changes?
No.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-flight/src/sql/client.rs | 53 +++++++++++++++++++++++++++++++-----------
1 file changed, 39 insertions(+), 14 deletions(-)
diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index 5009ae5ea5..4fb27ab5fc 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -56,12 +56,12 @@ use arrow_ipc::{MessageHeader, root_as_message};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use futures::{Stream, TryStreamExt, stream};
use prost::Message;
-use tonic::transport::Channel;
+use tonic::codegen::{Body, StdError};
use tonic::{IntoRequest, IntoStreamingRequest, Streaming};
/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow
data
/// by FlightSQL protocol.
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub struct FlightSqlServiceClient<T> {
token: Option<String>,
headers: HashMap<String, String>,
@@ -71,14 +71,20 @@ pub struct FlightSqlServiceClient<T> {
/// A FlightSql protocol client that can run queries against FlightSql servers
/// This client is in the "experimental" stage. It is not guaranteed to follow
the spec in all instances.
/// Github issues are welcomed.
-impl FlightSqlServiceClient<Channel> {
+impl<T> FlightSqlServiceClient<T>
+where
+ T: tonic::client::GrpcService<tonic::body::Body>,
+ T::Error: Into<StdError>,
+ T::ResponseBody: Body<Data = Bytes> + Send + 'static,
+ <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+{
/// Creates a new FlightSql client that connects to a server over an
arbitrary tonic `Channel`
- pub fn new(channel: Channel) -> Self {
+ pub fn new(channel: T) -> Self {
Self::new_from_inner(FlightServiceClient::new(channel))
}
/// Creates a new higher level client with the provided lower level client
- pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self {
+ pub fn new_from_inner(inner: FlightServiceClient<T>) -> Self {
Self {
token: None,
flight_client: inner,
@@ -87,17 +93,17 @@ impl FlightSqlServiceClient<Channel> {
}
/// Return a reference to the underlying [`FlightServiceClient`]
- pub fn inner(&self) -> &FlightServiceClient<Channel> {
+ pub fn inner(&self) -> &FlightServiceClient<T> {
&self.flight_client
}
/// Return a mutable reference to the underlying [`FlightServiceClient`]
- pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel> {
+ pub fn inner_mut(&mut self) -> &mut FlightServiceClient<T> {
&mut self.flight_client
}
/// Consume this client and return the underlying [`FlightServiceClient`]
- pub fn into_inner(self) -> FlightServiceClient<Channel> {
+ pub fn into_inner(self) -> FlightServiceClient<T> {
self.flight_client
}
@@ -416,7 +422,10 @@ impl FlightSqlServiceClient<Channel> {
&mut self,
query: String,
transaction_id: Option<Bytes>,
- ) -> Result<PreparedStatement<Channel>, ArrowError> {
+ ) -> Result<PreparedStatement<T>, ArrowError>
+ where
+ T: Clone,
+ {
let cmd = ActionCreatePreparedStatementRequest {
query,
transaction_id,
@@ -509,10 +518,10 @@ impl FlightSqlServiceClient<Channel> {
Ok(())
}
- fn set_request_headers<T>(
+ fn set_request_headers<M>(
&self,
- mut req: tonic::Request<T>,
- ) -> Result<tonic::Request<T>, ArrowError> {
+ mut req: tonic::Request<M>,
+ ) -> Result<tonic::Request<M>, ArrowError> {
for (k, v) in &self.headers {
let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
ArrowError::ParseError(format!("Cannot convert header key
\"{k}\": {e}"))
@@ -532,6 +541,16 @@ impl FlightSqlServiceClient<Channel> {
}
}
+impl<T: Clone> Clone for FlightSqlServiceClient<T> {
+ fn clone(&self) -> Self {
+ Self {
+ headers: self.headers.clone(),
+ token: self.token.clone(),
+ flight_client: self.flight_client.clone(),
+ }
+ }
+}
+
/// A PreparedStatement
#[derive(Debug, Clone)]
pub struct PreparedStatement<T> {
@@ -542,9 +561,15 @@ pub struct PreparedStatement<T> {
parameter_schema: Schema,
}
-impl PreparedStatement<Channel> {
+impl<T> PreparedStatement<T>
+where
+ T: tonic::client::GrpcService<tonic::body::Body>,
+ T::Error: Into<StdError>,
+ T::ResponseBody: Body<Data = Bytes> + Send + 'static,
+ <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+{
pub(crate) fn new(
- flight_client: FlightSqlServiceClient<Channel>,
+ flight_client: FlightSqlServiceClient<T>,
handle: impl Into<Bytes>,
dataset_schema: Schema,
parameter_schema: Schema,