This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch 53.0.0-dev
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/53.0.0-dev by this push:
     new 741bbf6854 bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` 
(#6041)
741bbf6854 is described below

commit 741bbf68546ffa2d9342cece82f3b1456f7a1f5b
Author: Bugen Zhao <[email protected]>
AuthorDate: Wed Jul 17 06:05:00 2024 +0800

    bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
    
    * bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    * fix example tests
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    ---------
    
    Signed-off-by: Bugen Zhao <[email protected]>
---
 arrow-flight/Cargo.toml                           | 11 +++----
 arrow-flight/examples/flight_sql_server.rs        |  6 ++--
 arrow-flight/gen/Cargo.toml                       |  4 +--
 arrow-flight/src/arrow.flight.protocol.rs         | 36 +++++------------------
 arrow-flight/src/sql/arrow.flight.protocol.sql.rs | 12 ++++----
 arrow-flight/tests/common/trailers_layer.rs       | 32 +++++++-------------
 arrow-integration-testing/Cargo.toml              |  4 +--
 7 files changed, 38 insertions(+), 67 deletions(-)

diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index 111bf94d80..3e7d6fa292 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -44,11 +44,11 @@ bytes = { version = "1", default-features = false }
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 once_cell = { version = "1", optional = true }
 paste = { version = "1.0" }
-prost = { version = "0.12.3", default-features = false, features = 
["prost-derive"] }
+prost = { version = "0.13.1", default-features = false, features = 
["prost-derive"] }
 # For Timestamp type
-prost-types = { version = "0.12.3", default-features = false }
+prost-types = { version = "0.13.1", default-features = false }
 tokio = { version = "1.0", default-features = false, features = ["macros", 
"rt", "rt-multi-thread"] }
-tonic = { version = "0.11.0", default-features = false, features = 
["transport", "codegen", "prost"] }
+tonic = { version = "0.12.0", default-features = false, features = 
["transport", "codegen", "prost"] }
 
 # CLI-related dependencies
 anyhow = { version = "1.0", optional = true }
@@ -70,8 +70,9 @@ cli = ["anyhow", "arrow-cast/prettyprint", "clap", 
"tracing-log", "tracing-subsc
 [dev-dependencies]
 arrow-cast = { workspace = true, features = ["prettyprint"] }
 assert_cmd = "2.0.8"
-http = "0.2.9"
-http-body = "0.4.5"
+http = "1.1.0"
+http-body = "1.0.0"
+hyper-util = "0.1"
 pin-project-lite = "0.2"
 tempfile = "3.3"
 tokio-stream = { version = "0.1", features = ["net"] }
diff --git a/arrow-flight/examples/flight_sql_server.rs 
b/arrow-flight/examples/flight_sql_server.rs
index 031628eaa8..d5168debc4 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -783,7 +783,8 @@ impl ProstMessageExt for FetchResults {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use futures::TryStreamExt;
+    use futures::{TryFutureExt, TryStreamExt};
+    use hyper_util::rt::TokioIo;
     use std::fs;
     use std::future::Future;
     use std::net::SocketAddr;
@@ -843,7 +844,8 @@ mod tests {
             .serve_with_incoming(stream);
 
         let request_future = async {
-            let connector = service_fn(move |_| 
UnixStream::connect(path.clone()));
+            let connector =
+                service_fn(move |_| 
UnixStream::connect(path.clone()).map_ok(TokioIo::new));
             let channel = Endpoint::try_from("http://example.com";)
                 .unwrap()
                 .connect_with_connector(connector)
diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml
index 7264a527ca..a12c683776 100644
--- a/arrow-flight/gen/Cargo.toml
+++ b/arrow-flight/gen/Cargo.toml
@@ -33,5 +33,5 @@ publish = false
 # Pin specific version of the tonic-build dependencies to avoid auto-generated
 # (and checked in) arrow.flight.protocol.rs from changing
 proc-macro2 = { version = "=1.0.86", default-features = false }
-prost-build = { version = "=0.12.6", default-features = false }
-tonic-build = { version = "=0.11.0", default-features = false, features = 
["transport", "prost"] }
+prost-build = { version = "=0.13.1", default-features = false }
+tonic-build = { version = "=0.12.0", default-features = false, features = 
["transport", "prost"] }
diff --git a/arrow-flight/src/arrow.flight.protocol.rs 
b/arrow-flight/src/arrow.flight.protocol.rs
index bc314de9d1..8c7292894e 100644
--- a/arrow-flight/src/arrow.flight.protocol.rs
+++ b/arrow-flight/src/arrow.flight.protocol.rs
@@ -38,7 +38,7 @@ pub struct BasicAuth {
     pub password: ::prost::alloc::string::String,
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct Empty {}
 ///
 /// Describes an available action, including both the name used for execution
@@ -103,7 +103,7 @@ pub struct Result {
 ///
 /// The result should be stored in Result.body.
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct CancelFlightInfoResult {
     #[prost(enumeration = "CancelStatus", tag = "1")]
     pub status: i32,
@@ -1053,19 +1053,17 @@ pub mod flight_service_server {
     /// can expose a set of actions that are available.
     #[derive(Debug)]
     pub struct FlightServiceServer<T: FlightService> {
-        inner: _Inner<T>,
+        inner: Arc<T>,
         accept_compression_encodings: EnabledCompressionEncodings,
         send_compression_encodings: EnabledCompressionEncodings,
         max_decoding_message_size: Option<usize>,
         max_encoding_message_size: Option<usize>,
     }
-    struct _Inner<T>(Arc<T>);
     impl<T: FlightService> FlightServiceServer<T> {
         pub fn new(inner: T) -> Self {
             Self::from_arc(Arc::new(inner))
         }
         pub fn from_arc(inner: Arc<T>) -> Self {
-            let inner = _Inner(inner);
             Self {
                 inner,
                 accept_compression_encodings: Default::default(),
@@ -1128,7 +1126,6 @@ pub mod flight_service_server {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
-            let inner = self.inner.clone();
             match req.uri().path() {
                 "/arrow.flight.protocol.FlightService/Handshake" => {
                     #[allow(non_camel_case_types)]
@@ -1162,7 +1159,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = HandshakeSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1209,7 +1205,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = ListFlightsSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1255,7 +1250,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = GetFlightInfoSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1302,7 +1296,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = PollFlightInfoSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1348,7 +1341,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = GetSchemaSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1395,7 +1387,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = DoGetSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1442,7 +1433,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = DoPutSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1489,7 +1479,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = DoExchangeSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1536,7 +1525,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = DoActionSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1583,7 +1571,6 @@ pub mod flight_service_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = ListActionsSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -1605,8 +1592,11 @@ pub mod flight_service_server {
                         Ok(
                             http::Response::builder()
                                 .status(200)
-                                .header("grpc-status", "12")
-                                .header("content-type", "application/grpc")
+                                .header("grpc-status", 
tonic::Code::Unimplemented as i32)
+                                .header(
+                                    http::header::CONTENT_TYPE,
+                                    tonic::metadata::GRPC_CONTENT_TYPE,
+                                )
                                 .body(empty_body())
                                 .unwrap(),
                         )
@@ -1627,16 +1617,6 @@ pub mod flight_service_server {
             }
         }
     }
-    impl<T: FlightService> Clone for _Inner<T> {
-        fn clone(&self) -> Self {
-            Self(Arc::clone(&self.0))
-        }
-    }
-    impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
-        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-            write!(f, "{:?}", self.0)
-        }
-    }
     impl<T: FlightService> tonic::server::NamedService for 
FlightServiceServer<T> {
         const NAME: &'static str = "arrow.flight.protocol.FlightService";
     }
diff --git a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs 
b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs
index c1f0fac0f6..5e6f198df7 100644
--- a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs
+++ b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs
@@ -101,7 +101,7 @@ pub struct CommandGetSqlInfo {
 /// >
 /// The returned data should be ordered by data_type and then by type_name.
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct CommandGetXdbcTypeInfo {
     ///
     /// Specifies the data type to search for the info.
@@ -121,7 +121,7 @@ pub struct CommandGetXdbcTypeInfo {
 /// >
 /// The returned data should be ordered by catalog_name.
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct CommandGetCatalogs {}
 ///
 /// Represents a request to retrieve the list of database schemas on a Flight 
SQL enabled backend.
@@ -232,7 +232,7 @@ pub struct CommandGetTables {
 /// >
 /// The returned data should be ordered by table_type.
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct CommandGetTableTypes {}
 ///
 /// Represents a request to retrieve the primary keys of a table on a Flight 
SQL enabled backend.
@@ -511,7 +511,7 @@ pub struct ActionClosePreparedStatementRequest {
 /// Request message for the "BeginTransaction" action.
 /// Begins a transaction.
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ActionBeginTransactionRequest {}
 ///
 /// Request message for the "BeginSavepoint" action.
@@ -802,7 +802,7 @@ pub struct CommandPreparedStatementUpdate {
 /// CommandPreparedStatementUpdate was in the request, containing
 /// results from the update.
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct DoPutUpdateResult {
     /// The number of records updated. A return value of -1 represents
     /// an unknown updated record count.
@@ -862,7 +862,7 @@ pub struct ActionCancelQueryRequest {
 /// This command is deprecated since 13.0.0. Use the "CancelFlightInfo"
 /// action with DoAction instead.
 #[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ActionCancelQueryResult {
     #[prost(enumeration = "action_cancel_query_result::CancelResult", tag = 
"1")]
     pub result: i32,
diff --git a/arrow-flight/tests/common/trailers_layer.rs 
b/arrow-flight/tests/common/trailers_layer.rs
index b2ab74f7d9..0ccb7df86c 100644
--- a/arrow-flight/tests/common/trailers_layer.rs
+++ b/arrow-flight/tests/common/trailers_layer.rs
@@ -21,7 +21,7 @@ use std::task::{Context, Poll};
 
 use futures::ready;
 use http::{HeaderValue, Request, Response};
-use http_body::SizeHint;
+use http_body::{Frame, SizeHint};
 use pin_project_lite::pin_project;
 use tower::{Layer, Service};
 
@@ -99,31 +99,19 @@ impl<B: http_body::Body> http_body::Body for WrappedBody<B> 
{
     type Data = B::Data;
     type Error = B::Error;
 
-    fn poll_data(
-        mut self: Pin<&mut Self>,
+    fn poll_frame(
+        self: Pin<&mut Self>,
         cx: &mut Context<'_>,
-    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
-        self.as_mut().project().inner.poll_data(cx)
-    }
-
-    fn poll_trailers(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<Option<http::header::HeaderMap>, Self::Error>> {
-        let result: Result<Option<http::header::HeaderMap>, Self::Error> =
-            ready!(self.as_mut().project().inner.poll_trailers(cx));
-
-        let mut trailers = http::header::HeaderMap::new();
-        trailers.insert("test-trailer", 
HeaderValue::from_static("trailer_val"));
+    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+        let mut result = ready!(self.project().inner.poll_frame(cx));
 
-        match result {
-            Ok(Some(mut existing)) => {
-                existing.extend(trailers.iter().map(|(k, v)| (k.clone(), 
v.clone())));
-                Poll::Ready(Ok(Some(existing)))
+        if let Some(Ok(frame)) = &mut result {
+            if let Some(trailers) = frame.trailers_mut() {
+                trailers.insert("test-trailer", 
HeaderValue::from_static("trailer_val"));
             }
-            Ok(None) => Poll::Ready(Ok(Some(trailers))),
-            Err(e) => Poll::Ready(Err(e)),
         }
+
+        Poll::Ready(result)
     }
 
     fn is_end_stream(&self) -> bool {
diff --git a/arrow-integration-testing/Cargo.toml 
b/arrow-integration-testing/Cargo.toml
index 032b99f4fb..7be56d9198 100644
--- a/arrow-integration-testing/Cargo.toml
+++ b/arrow-integration-testing/Cargo.toml
@@ -42,11 +42,11 @@ async-trait = { version = "0.1.41", default-features = 
false }
 clap = { version = "4", default-features = false, features = ["std", "derive", 
"help", "error-context", "usage"] }
 futures = { version = "0.3", default-features = false }
 hex = { version = "0.4", default-features = false, features = ["std"] }
-prost = { version = "0.12", default-features = false }
+prost = { version = "0.13", default-features = false }
 serde = { version = "1.0", default-features = false, features = ["rc", 
"derive"] }
 serde_json = { version = "1.0", default-features = false, features = ["std"] }
 tokio = { version = "1.0", default-features = false }
-tonic = { version = "0.11", default-features = false }
+tonic = { version = "0.12", default-features = false }
 tracing-subscriber = { version = "0.3.1", default-features = false, features = 
["fmt"], optional = true }
 num = { version = "0.4", default-features = false, features = ["std"] }
 flate2 = { version = "1", default-features = false, features = 
["rust_backend"] }

Reply via email to