This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch 53.0.0-dev
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/53.0.0-dev by this push:
new 741bbf6854 bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`
(#6041)
741bbf6854 is described below
commit 741bbf68546ffa2d9342cece82f3b1456f7a1f5b
Author: Bugen Zhao <[email protected]>
AuthorDate: Wed Jul 17 06:05:00 2024 +0800
bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`
Signed-off-by: Bugen Zhao <[email protected]>
* fix example tests
Signed-off-by: Bugen Zhao <[email protected]>
---------
Signed-off-by: Bugen Zhao <[email protected]>
---
arrow-flight/Cargo.toml | 11 +++----
arrow-flight/examples/flight_sql_server.rs | 6 ++--
arrow-flight/gen/Cargo.toml | 4 +--
arrow-flight/src/arrow.flight.protocol.rs | 36 +++++------------------
arrow-flight/src/sql/arrow.flight.protocol.sql.rs | 12 ++++----
arrow-flight/tests/common/trailers_layer.rs | 32 +++++++-------------
arrow-integration-testing/Cargo.toml | 4 +--
7 files changed, 38 insertions(+), 67 deletions(-)
diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index 111bf94d80..3e7d6fa292 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -44,11 +44,11 @@ bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
once_cell = { version = "1", optional = true }
paste = { version = "1.0" }
-prost = { version = "0.12.3", default-features = false, features =
["prost-derive"] }
+prost = { version = "0.13.1", default-features = false, features =
["prost-derive"] }
# For Timestamp type
-prost-types = { version = "0.12.3", default-features = false }
+prost-types = { version = "0.13.1", default-features = false }
tokio = { version = "1.0", default-features = false, features = ["macros",
"rt", "rt-multi-thread"] }
-tonic = { version = "0.11.0", default-features = false, features =
["transport", "codegen", "prost"] }
+tonic = { version = "0.12.0", default-features = false, features =
["transport", "codegen", "prost"] }
# CLI-related dependencies
anyhow = { version = "1.0", optional = true }
@@ -70,8 +70,9 @@ cli = ["anyhow", "arrow-cast/prettyprint", "clap",
"tracing-log", "tracing-subsc
[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
assert_cmd = "2.0.8"
-http = "0.2.9"
-http-body = "0.4.5"
+http = "1.1.0"
+http-body = "1.0.0"
+hyper-util = "0.1"
pin-project-lite = "0.2"
tempfile = "3.3"
tokio-stream = { version = "0.1", features = ["net"] }
diff --git a/arrow-flight/examples/flight_sql_server.rs
b/arrow-flight/examples/flight_sql_server.rs
index 031628eaa8..d5168debc4 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -783,7 +783,8 @@ impl ProstMessageExt for FetchResults {
#[cfg(test)]
mod tests {
use super::*;
- use futures::TryStreamExt;
+ use futures::{TryFutureExt, TryStreamExt};
+ use hyper_util::rt::TokioIo;
use std::fs;
use std::future::Future;
use std::net::SocketAddr;
@@ -843,7 +844,8 @@ mod tests {
.serve_with_incoming(stream);
let request_future = async {
- let connector = service_fn(move |_|
UnixStream::connect(path.clone()));
+ let connector =
+ service_fn(move |_|
UnixStream::connect(path.clone()).map_ok(TokioIo::new));
let channel = Endpoint::try_from("http://example.com")
.unwrap()
.connect_with_connector(connector)
diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml
index 7264a527ca..a12c683776 100644
--- a/arrow-flight/gen/Cargo.toml
+++ b/arrow-flight/gen/Cargo.toml
@@ -33,5 +33,5 @@ publish = false
# Pin specific version of the tonic-build dependencies to avoid auto-generated
# (and checked in) arrow.flight.protocol.rs from changing
proc-macro2 = { version = "=1.0.86", default-features = false }
-prost-build = { version = "=0.12.6", default-features = false }
-tonic-build = { version = "=0.11.0", default-features = false, features =
["transport", "prost"] }
+prost-build = { version = "=0.13.1", default-features = false }
+tonic-build = { version = "=0.12.0", default-features = false, features =
["transport", "prost"] }
diff --git a/arrow-flight/src/arrow.flight.protocol.rs
b/arrow-flight/src/arrow.flight.protocol.rs
index bc314de9d1..8c7292894e 100644
--- a/arrow-flight/src/arrow.flight.protocol.rs
+++ b/arrow-flight/src/arrow.flight.protocol.rs
@@ -38,7 +38,7 @@ pub struct BasicAuth {
pub password: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Empty {}
///
/// Describes an available action, including both the name used for execution
@@ -103,7 +103,7 @@ pub struct Result {
///
/// The result should be stored in Result.body.
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct CancelFlightInfoResult {
#[prost(enumeration = "CancelStatus", tag = "1")]
pub status: i32,
@@ -1053,19 +1053,17 @@ pub mod flight_service_server {
/// can expose a set of actions that are available.
#[derive(Debug)]
pub struct FlightServiceServer<T: FlightService> {
- inner: _Inner<T>,
+ inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
- struct _Inner<T>(Arc<T>);
impl<T: FlightService> FlightServiceServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
- let inner = _Inner(inner);
Self {
inner,
accept_compression_encodings: Default::default(),
@@ -1128,7 +1126,6 @@ pub mod flight_service_server {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
- let inner = self.inner.clone();
match req.uri().path() {
"/arrow.flight.protocol.FlightService/Handshake" => {
#[allow(non_camel_case_types)]
@@ -1162,7 +1159,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = HandshakeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1209,7 +1205,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = ListFlightsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1255,7 +1250,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = GetFlightInfoSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1302,7 +1296,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = PollFlightInfoSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1348,7 +1341,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = GetSchemaSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1395,7 +1387,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = DoGetSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1442,7 +1433,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = DoPutSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1489,7 +1479,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = DoExchangeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1536,7 +1525,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = DoActionSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1583,7 +1571,6 @@ pub mod flight_service_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = ListActionsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -1605,8 +1592,11 @@ pub mod flight_service_server {
Ok(
http::Response::builder()
.status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
+ .header("grpc-status",
tonic::Code::Unimplemented as i32)
+ .header(
+ http::header::CONTENT_TYPE,
+ tonic::metadata::GRPC_CONTENT_TYPE,
+ )
.body(empty_body())
.unwrap(),
)
@@ -1627,16 +1617,6 @@ pub mod flight_service_server {
}
}
}
- impl<T: FlightService> Clone for _Inner<T> {
- fn clone(&self) -> Self {
- Self(Arc::clone(&self.0))
- }
- }
- impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{:?}", self.0)
- }
- }
impl<T: FlightService> tonic::server::NamedService for
FlightServiceServer<T> {
const NAME: &'static str = "arrow.flight.protocol.FlightService";
}
diff --git a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs
b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs
index c1f0fac0f6..5e6f198df7 100644
--- a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs
+++ b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs
@@ -101,7 +101,7 @@ pub struct CommandGetSqlInfo {
/// >
/// The returned data should be ordered by data_type and then by type_name.
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct CommandGetXdbcTypeInfo {
///
/// Specifies the data type to search for the info.
@@ -121,7 +121,7 @@ pub struct CommandGetXdbcTypeInfo {
/// >
/// The returned data should be ordered by catalog_name.
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct CommandGetCatalogs {}
///
/// Represents a request to retrieve the list of database schemas on a Flight
SQL enabled backend.
@@ -232,7 +232,7 @@ pub struct CommandGetTables {
/// >
/// The returned data should be ordered by table_type.
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct CommandGetTableTypes {}
///
/// Represents a request to retrieve the primary keys of a table on a Flight
SQL enabled backend.
@@ -511,7 +511,7 @@ pub struct ActionClosePreparedStatementRequest {
/// Request message for the "BeginTransaction" action.
/// Begins a transaction.
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ActionBeginTransactionRequest {}
///
/// Request message for the "BeginSavepoint" action.
@@ -802,7 +802,7 @@ pub struct CommandPreparedStatementUpdate {
/// CommandPreparedStatementUpdate was in the request, containing
/// results from the update.
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct DoPutUpdateResult {
/// The number of records updated. A return value of -1 represents
/// an unknown updated record count.
@@ -862,7 +862,7 @@ pub struct ActionCancelQueryRequest {
/// This command is deprecated since 13.0.0. Use the "CancelFlightInfo"
/// action with DoAction instead.
#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ActionCancelQueryResult {
#[prost(enumeration = "action_cancel_query_result::CancelResult", tag =
"1")]
pub result: i32,
diff --git a/arrow-flight/tests/common/trailers_layer.rs
b/arrow-flight/tests/common/trailers_layer.rs
index b2ab74f7d9..0ccb7df86c 100644
--- a/arrow-flight/tests/common/trailers_layer.rs
+++ b/arrow-flight/tests/common/trailers_layer.rs
@@ -21,7 +21,7 @@ use std::task::{Context, Poll};
use futures::ready;
use http::{HeaderValue, Request, Response};
-use http_body::SizeHint;
+use http_body::{Frame, SizeHint};
use pin_project_lite::pin_project;
use tower::{Layer, Service};
@@ -99,31 +99,19 @@ impl<B: http_body::Body> http_body::Body for WrappedBody<B>
{
type Data = B::Data;
type Error = B::Error;
- fn poll_data(
- mut self: Pin<&mut Self>,
+ fn poll_frame(
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
- ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
- self.as_mut().project().inner.poll_data(cx)
- }
-
- fn poll_trailers(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<Option<http::header::HeaderMap>, Self::Error>> {
- let result: Result<Option<http::header::HeaderMap>, Self::Error> =
- ready!(self.as_mut().project().inner.poll_trailers(cx));
-
- let mut trailers = http::header::HeaderMap::new();
- trailers.insert("test-trailer",
HeaderValue::from_static("trailer_val"));
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ let mut result = ready!(self.project().inner.poll_frame(cx));
- match result {
- Ok(Some(mut existing)) => {
- existing.extend(trailers.iter().map(|(k, v)| (k.clone(),
v.clone())));
- Poll::Ready(Ok(Some(existing)))
+ if let Some(Ok(frame)) = &mut result {
+ if let Some(trailers) = frame.trailers_mut() {
+ trailers.insert("test-trailer",
HeaderValue::from_static("trailer_val"));
}
- Ok(None) => Poll::Ready(Ok(Some(trailers))),
- Err(e) => Poll::Ready(Err(e)),
}
+
+ Poll::Ready(result)
}
fn is_end_stream(&self) -> bool {
diff --git a/arrow-integration-testing/Cargo.toml
b/arrow-integration-testing/Cargo.toml
index 032b99f4fb..7be56d9198 100644
--- a/arrow-integration-testing/Cargo.toml
+++ b/arrow-integration-testing/Cargo.toml
@@ -42,11 +42,11 @@ async-trait = { version = "0.1.41", default-features =
false }
clap = { version = "4", default-features = false, features = ["std", "derive",
"help", "error-context", "usage"] }
futures = { version = "0.3", default-features = false }
hex = { version = "0.4", default-features = false, features = ["std"] }
-prost = { version = "0.12", default-features = false }
+prost = { version = "0.13", default-features = false }
serde = { version = "1.0", default-features = false, features = ["rc",
"derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio = { version = "1.0", default-features = false }
-tonic = { version = "0.11", default-features = false }
+tonic = { version = "0.12", default-features = false }
tracing-subscriber = { version = "0.3.1", default-features = false, features =
["fmt"], optional = true }
num = { version = "0.4", default-features = false, features = ["std"] }
flate2 = { version = "1", default-features = false, features =
["rust_backend"] }