This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 16f59056a4 feat: log headers/trailers in flight CLI (+ minor fixes) 
(#4898)
16f59056a4 is described below

commit 16f59056a4920e3f7cdfbde5c7faf0f05139c1d4
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Oct 10 09:51:44 2023 +0200

    feat: log headers/trailers in flight CLI (+ minor fixes) (#4898)
    
    * feat: improve CLI logging setup
    
    * refactor: flight SQL DoGet should be a high-level interface
    
    * feat: log headers/trailers in SQL CLI
    
    * fix: replace explicit panics in CLI
---
 arrow-flight/Cargo.toml                    |  2 +-
 arrow-flight/examples/flight_sql_server.rs |  4 +-
 arrow-flight/src/bin/flight_sql_client.rs  | 94 +++++++++++++++++++++++++-----
 arrow-flight/src/sql/client.rs             | 16 ++++-
 4 files changed, 93 insertions(+), 23 deletions(-)

diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index 54c5cdf5e2..edaa7129dc 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -52,7 +52,7 @@ tonic = { version = "0.10.0", default-features = false, 
features = ["transport",
 anyhow = { version = "1.0", optional = true }
 clap = { version = "4.1", default-features = false, features = ["std", 
"derive", "env", "help", "error-context", "usage"], optional = true }
 tracing-log = { version = "0.1", optional = true }
-tracing-subscriber = { version = "0.3.1", default-features = false, features = 
["ansi", "fmt"], optional = true }
+tracing-subscriber = { version = "0.3.1", default-features = false, features = 
["ansi", "env-filter", "fmt"], optional = true }
 
 [package.metadata.docs.rs]
 all-features = true
diff --git a/arrow-flight/examples/flight_sql_server.rs 
b/arrow-flight/examples/flight_sql_server.rs
index d1aeae6f0a..013f7e7788 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -789,7 +789,6 @@ mod tests {
 
     use arrow_cast::pretty::pretty_format_batches;
     use arrow_flight::sql::client::FlightSqlServiceClient;
-    use arrow_flight::utils::flight_data_to_batches;
     use tonic::transport::server::TcpIncoming;
     use tonic::transport::{Certificate, Endpoint};
     use tower::service_fn;
@@ -955,8 +954,7 @@ mod tests {
 
             let ticket = 
flight_info.endpoint[0].ticket.as_ref().unwrap().clone();
             let flight_data = client.do_get(ticket).await.unwrap();
-            let flight_data: Vec<FlightData> = 
flight_data.try_collect().await.unwrap();
-            let batches = flight_data_to_batches(&flight_data).unwrap();
+            let batches: Vec<_> = flight_data.try_collect().await.unwrap();
 
             let res = pretty_format_batches(batches.as_slice()).unwrap();
             let expected = r#"
diff --git a/arrow-flight/src/bin/flight_sql_client.rs 
b/arrow-flight/src/bin/flight_sql_client.rs
index c6aaccf376..df51530b3c 100644
--- a/arrow-flight/src/bin/flight_sql_client.rs
+++ b/arrow-flight/src/bin/flight_sql_client.rs
@@ -17,17 +17,17 @@
 
 use std::{error::Error, sync::Arc, time::Duration};
 
-use anyhow::{Context, Result};
+use anyhow::{bail, Context, Result};
 use arrow_array::{ArrayRef, Datum, RecordBatch, StringArray};
 use arrow_cast::{cast_with_options, pretty::pretty_format_batches, 
CastOptions};
-use arrow_flight::{
-    sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, 
FlightData,
-    FlightInfo,
-};
+use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo};
 use arrow_schema::Schema;
 use clap::{Parser, Subcommand};
 use futures::TryStreamExt;
-use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
+use tonic::{
+    metadata::MetadataMap,
+    transport::{Channel, ClientTlsConfig, Endpoint},
+};
 use tracing_log::log::info;
 
 /// A ':' separated key value pair
@@ -61,6 +61,22 @@ where
     }
 }
 
+/// Logging CLI config.
+#[derive(Debug, Parser)]
+pub struct LoggingArgs {
+    /// Log verbosity.
+    ///
+    /// Use `-v for warn, `-vv for info, -vvv for debug, -vvvv for trace.
+    ///
+    /// Note you can also set logging level using `RUST_LOG` environment 
variable: `RUST_LOG=debug`
+    #[clap(
+        short = 'v',
+        long = "verbose",
+        action = clap::ArgAction::Count,
+    )]
+    log_verbose_count: u8,
+}
+
 #[derive(Debug, Parser)]
 struct ClientArgs {
     /// Additional headers.
@@ -96,6 +112,10 @@ struct ClientArgs {
 
 #[derive(Debug, Parser)]
 struct Args {
+    /// Logging args.
+    #[clap(flatten)]
+    logging_args: LoggingArgs,
+
     /// Client args.
     #[clap(flatten)]
     client_args: ClientArgs,
@@ -119,7 +139,7 @@ enum Command {
 #[tokio::main]
 async fn main() -> Result<()> {
     let args = Args::parse();
-    setup_logging()?;
+    setup_logging(args.logging_args)?;
     let mut client = setup_client(args.client_args)
         .await
         .context("setup client")?;
@@ -177,16 +197,21 @@ async fn execute_flight(
 
     for endpoint in info.endpoint {
         let Some(ticket) = &endpoint.ticket else {
-            panic!("did not get ticket");
+            bail!("did not get ticket");
         };
-        let flight_data = client.do_get(ticket.clone()).await.context("do 
get")?;
-        let flight_data: Vec<FlightData> = flight_data
+
+        let mut flight_data = client.do_get(ticket.clone()).await.context("do 
get")?;
+        log_metadata(flight_data.headers(), "header");
+
+        let mut endpoint_batches: Vec<_> = (&mut flight_data)
             .try_collect()
             .await
             .context("collect data stream")?;
-        let mut endpoint_batches = flight_data_to_batches(&flight_data)
-            .context("convert flight data to record batches")?;
         batches.append(&mut endpoint_batches);
+
+        if let Some(trailers) = flight_data.trailers() {
+            log_metadata(&trailers, "trailer");
+        }
     }
     info!("received data");
 
@@ -213,9 +238,22 @@ fn construct_record_batch_from_params(
     Ok(RecordBatch::try_from_iter(items)?)
 }
 
-fn setup_logging() -> Result<()> {
+fn setup_logging(args: LoggingArgs) -> Result<()> {
+    use tracing_subscriber::{util::SubscriberInitExt, EnvFilter, 
FmtSubscriber};
+
     tracing_log::LogTracer::init().context("tracing log init")?;
-    tracing_subscriber::fmt::init();
+
+    let filter = match args.log_verbose_count {
+        0 => "warn",
+        1 => "info",
+        2 => "debug",
+        _ => "trace",
+    };
+    let filter = EnvFilter::try_new(filter).context("set up log env filter")?;
+
+    let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
+    subscriber.try_init().context("init logging subscriber")?;
+
     Ok(())
 }
 
@@ -265,10 +303,10 @@ async fn setup_client(args: ClientArgs) -> 
Result<FlightSqlServiceClient<Channel
             info!("performed handshake");
         }
         (Some(_), None) => {
-            panic!("when username is set, you also need to set a password")
+            bail!("when username is set, you also need to set a password")
         }
         (None, Some(_)) => {
-            panic!("when password is set, you also need to set a username")
+            bail!("when password is set, you also need to set a username")
         }
     }
 
@@ -284,3 +322,27 @@ fn parse_key_val(
         .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?;
     Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
 }
+
+/// Log headers/trailers.
+fn log_metadata(map: &MetadataMap, what: &'static str) {
+    for k_v in map.iter() {
+        match k_v {
+            tonic::metadata::KeyAndValueRef::Ascii(k, v) => {
+                info!(
+                    "{}: {}={}",
+                    what,
+                    k.as_str(),
+                    v.to_str().unwrap_or("<invalid>"),
+                );
+            }
+            tonic::metadata::KeyAndValueRef::Binary(k, v) => {
+                info!(
+                    "{}: {}={}",
+                    what,
+                    k.as_str(),
+                    String::from_utf8_lossy(v.as_ref()),
+                );
+            }
+        }
+    }
+}
diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index 2d382cf2ca..7685813ff8 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -24,6 +24,7 @@ use std::collections::HashMap;
 use std::str::FromStr;
 use tonic::metadata::AsciiMetadataKey;
 
+use crate::decode::FlightRecordBatchStream;
 use crate::encode::FlightDataEncoderBuilder;
 use crate::error::FlightError;
 use crate::flight_service_client::FlightServiceClient;
@@ -37,6 +38,7 @@ use crate::sql::{
     CommandPreparedStatementQuery, CommandPreparedStatementUpdate, 
CommandStatementQuery,
     CommandStatementUpdate, DoPutUpdateResult, ProstMessageExt, SqlInfo,
 };
+use crate::trailers::extract_lazy_trailers;
 use crate::{
     Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
     HandshakeResponse, IpcMessage, PutResult, Ticket,
@@ -231,14 +233,22 @@ impl FlightSqlServiceClient<Channel> {
     pub async fn do_get(
         &mut self,
         ticket: impl IntoRequest<Ticket>,
-    ) -> Result<Streaming<FlightData>, ArrowError> {
+    ) -> Result<FlightRecordBatchStream, ArrowError> {
         let req = self.set_request_headers(ticket.into_request())?;
-        Ok(self
+
+        let (md, response_stream, _ext) = self
             .flight_client
             .do_get(req)
             .await
             .map_err(status_to_arrow_error)?
-            .into_inner())
+            .into_parts();
+        let (response_stream, trailers) = 
extract_lazy_trailers(response_stream);
+
+        Ok(FlightRecordBatchStream::new_from_flight_data(
+            response_stream.map_err(FlightError::Tonic),
+        )
+        .with_headers(md)
+        .with_trailers(trailers))
     }
 
     /// Push a stream to the flight service associated with a particular 
flight stream.

Reply via email to