This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 16f59056a4 feat: log headers/trailers in flight CLI (+ minor fixes)
(#4898)
16f59056a4 is described below
commit 16f59056a4920e3f7cdfbde5c7faf0f05139c1d4
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Oct 10 09:51:44 2023 +0200
feat: log headers/trailers in flight CLI (+ minor fixes) (#4898)
* feat: improve CLI logging setup
* refactor: flight SQL DoGet should be a high-level interface
* feat: log headers/trailers in SQL CLI
* fix: replace explicit panics in CLI
---
arrow-flight/Cargo.toml | 2 +-
arrow-flight/examples/flight_sql_server.rs | 4 +-
arrow-flight/src/bin/flight_sql_client.rs | 94 +++++++++++++++++++++++++-----
arrow-flight/src/sql/client.rs | 16 ++++-
4 files changed, 93 insertions(+), 23 deletions(-)
diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index 54c5cdf5e2..edaa7129dc 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -52,7 +52,7 @@ tonic = { version = "0.10.0", default-features = false,
features = ["transport",
anyhow = { version = "1.0", optional = true }
clap = { version = "4.1", default-features = false, features = ["std",
"derive", "env", "help", "error-context", "usage"], optional = true }
tracing-log = { version = "0.1", optional = true }
-tracing-subscriber = { version = "0.3.1", default-features = false, features =
["ansi", "fmt"], optional = true }
+tracing-subscriber = { version = "0.3.1", default-features = false, features =
["ansi", "env-filter", "fmt"], optional = true }
[package.metadata.docs.rs]
all-features = true
diff --git a/arrow-flight/examples/flight_sql_server.rs
b/arrow-flight/examples/flight_sql_server.rs
index d1aeae6f0a..013f7e7788 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -789,7 +789,6 @@ mod tests {
use arrow_cast::pretty::pretty_format_batches;
use arrow_flight::sql::client::FlightSqlServiceClient;
- use arrow_flight::utils::flight_data_to_batches;
use tonic::transport::server::TcpIncoming;
use tonic::transport::{Certificate, Endpoint};
use tower::service_fn;
@@ -955,8 +954,7 @@ mod tests {
let ticket =
flight_info.endpoint[0].ticket.as_ref().unwrap().clone();
let flight_data = client.do_get(ticket).await.unwrap();
- let flight_data: Vec<FlightData> =
flight_data.try_collect().await.unwrap();
- let batches = flight_data_to_batches(&flight_data).unwrap();
+ let batches: Vec<_> = flight_data.try_collect().await.unwrap();
let res = pretty_format_batches(batches.as_slice()).unwrap();
let expected = r#"
diff --git a/arrow-flight/src/bin/flight_sql_client.rs
b/arrow-flight/src/bin/flight_sql_client.rs
index c6aaccf376..df51530b3c 100644
--- a/arrow-flight/src/bin/flight_sql_client.rs
+++ b/arrow-flight/src/bin/flight_sql_client.rs
@@ -17,17 +17,17 @@
use std::{error::Error, sync::Arc, time::Duration};
-use anyhow::{Context, Result};
+use anyhow::{bail, Context, Result};
use arrow_array::{ArrayRef, Datum, RecordBatch, StringArray};
use arrow_cast::{cast_with_options, pretty::pretty_format_batches,
CastOptions};
-use arrow_flight::{
- sql::client::FlightSqlServiceClient, utils::flight_data_to_batches,
FlightData,
- FlightInfo,
-};
+use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo};
use arrow_schema::Schema;
use clap::{Parser, Subcommand};
use futures::TryStreamExt;
-use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
+use tonic::{
+ metadata::MetadataMap,
+ transport::{Channel, ClientTlsConfig, Endpoint},
+};
use tracing_log::log::info;
/// A ':' separated key value pair
@@ -61,6 +61,22 @@ where
}
}
+/// Logging CLI config.
+#[derive(Debug, Parser)]
+pub struct LoggingArgs {
+ /// Log verbosity.
+ ///
+ /// Use `-v for warn, `-vv for info, -vvv for debug, -vvvv for trace.
+ ///
+ /// Note you can also set logging level using `RUST_LOG` environment
variable: `RUST_LOG=debug`
+ #[clap(
+ short = 'v',
+ long = "verbose",
+ action = clap::ArgAction::Count,
+ )]
+ log_verbose_count: u8,
+}
+
#[derive(Debug, Parser)]
struct ClientArgs {
/// Additional headers.
@@ -96,6 +112,10 @@ struct ClientArgs {
#[derive(Debug, Parser)]
struct Args {
+ /// Logging args.
+ #[clap(flatten)]
+ logging_args: LoggingArgs,
+
/// Client args.
#[clap(flatten)]
client_args: ClientArgs,
@@ -119,7 +139,7 @@ enum Command {
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
- setup_logging()?;
+ setup_logging(args.logging_args)?;
let mut client = setup_client(args.client_args)
.await
.context("setup client")?;
@@ -177,16 +197,21 @@ async fn execute_flight(
for endpoint in info.endpoint {
let Some(ticket) = &endpoint.ticket else {
- panic!("did not get ticket");
+ bail!("did not get ticket");
};
- let flight_data = client.do_get(ticket.clone()).await.context("do
get")?;
- let flight_data: Vec<FlightData> = flight_data
+
+ let mut flight_data = client.do_get(ticket.clone()).await.context("do
get")?;
+ log_metadata(flight_data.headers(), "header");
+
+ let mut endpoint_batches: Vec<_> = (&mut flight_data)
.try_collect()
.await
.context("collect data stream")?;
- let mut endpoint_batches = flight_data_to_batches(&flight_data)
- .context("convert flight data to record batches")?;
batches.append(&mut endpoint_batches);
+
+ if let Some(trailers) = flight_data.trailers() {
+ log_metadata(&trailers, "trailer");
+ }
}
info!("received data");
@@ -213,9 +238,22 @@ fn construct_record_batch_from_params(
Ok(RecordBatch::try_from_iter(items)?)
}
-fn setup_logging() -> Result<()> {
+fn setup_logging(args: LoggingArgs) -> Result<()> {
+ use tracing_subscriber::{util::SubscriberInitExt, EnvFilter,
FmtSubscriber};
+
tracing_log::LogTracer::init().context("tracing log init")?;
- tracing_subscriber::fmt::init();
+
+ let filter = match args.log_verbose_count {
+ 0 => "warn",
+ 1 => "info",
+ 2 => "debug",
+ _ => "trace",
+ };
+ let filter = EnvFilter::try_new(filter).context("set up log env filter")?;
+
+ let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
+ subscriber.try_init().context("init logging subscriber")?;
+
Ok(())
}
@@ -265,10 +303,10 @@ async fn setup_client(args: ClientArgs) ->
Result<FlightSqlServiceClient<Channel
info!("performed handshake");
}
(Some(_), None) => {
- panic!("when username is set, you also need to set a password")
+ bail!("when username is set, you also need to set a password")
}
(None, Some(_)) => {
- panic!("when password is set, you also need to set a username")
+ bail!("when password is set, you also need to set a username")
}
}
@@ -284,3 +322,27 @@ fn parse_key_val(
.ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?;
Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
}
+
+/// Log headers/trailers.
+fn log_metadata(map: &MetadataMap, what: &'static str) {
+ for k_v in map.iter() {
+ match k_v {
+ tonic::metadata::KeyAndValueRef::Ascii(k, v) => {
+ info!(
+ "{}: {}={}",
+ what,
+ k.as_str(),
+ v.to_str().unwrap_or("<invalid>"),
+ );
+ }
+ tonic::metadata::KeyAndValueRef::Binary(k, v) => {
+ info!(
+ "{}: {}={}",
+ what,
+ k.as_str(),
+ String::from_utf8_lossy(v.as_ref()),
+ );
+ }
+ }
+ }
+}
diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index 2d382cf2ca..7685813ff8 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use tonic::metadata::AsciiMetadataKey;
+use crate::decode::FlightRecordBatchStream;
use crate::encode::FlightDataEncoderBuilder;
use crate::error::FlightError;
use crate::flight_service_client::FlightServiceClient;
@@ -37,6 +38,7 @@ use crate::sql::{
CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery,
CommandStatementUpdate, DoPutUpdateResult, ProstMessageExt, SqlInfo,
};
+use crate::trailers::extract_lazy_trailers;
use crate::{
Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, PutResult, Ticket,
@@ -231,14 +233,22 @@ impl FlightSqlServiceClient<Channel> {
pub async fn do_get(
&mut self,
ticket: impl IntoRequest<Ticket>,
- ) -> Result<Streaming<FlightData>, ArrowError> {
+ ) -> Result<FlightRecordBatchStream, ArrowError> {
let req = self.set_request_headers(ticket.into_request())?;
- Ok(self
+
+ let (md, response_stream, _ext) = self
.flight_client
.do_get(req)
.await
.map_err(status_to_arrow_error)?
- .into_inner())
+ .into_parts();
+ let (response_stream, trailers) =
extract_lazy_trailers(response_stream);
+
+ Ok(FlightRecordBatchStream::new_from_flight_data(
+ response_stream.map_err(FlightError::Tonic),
+ )
+ .with_headers(md)
+ .with_trailers(trailers))
}
/// Push a stream to the flight service associated with a particular
flight stream.