This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fe89d0b2b5 Improve flight sql examples (#10432)
fe89d0b2b5 is described below
commit fe89d0b2b5db93cb508ad7bc8d6d7f5e3168e306
Author: 张林伟 <[email protected]>
AuthorDate: Fri May 10 19:46:44 2024 +0800
Improve flight sql examples (#10432)
* Implement get_flight_info_tables
* remove unimplemented methods
* Simplify FlightInfo construction
---
.../examples/flight/flight_sql_server.rs | 423 ++++-----------------
datafusion/expr/src/table_source.rs | 10 +
2 files changed, 83 insertions(+), 350 deletions(-)
diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs
b/datafusion-examples/examples/flight/flight_sql_server.rs
index f04a559d00..2e46daf7cb 100644
--- a/datafusion-examples/examples/flight/flight_sql_server.rs
+++ b/datafusion-examples/examples/flight/flight_sql_server.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::{ArrayRef, StringArray};
use arrow::ipc::writer::IpcWriteOptions;
use arrow::record_batch::RecordBatch;
use arrow_flight::encode::FlightDataEncoderBuilder;
@@ -22,24 +23,16 @@ use arrow_flight::flight_descriptor::DescriptorType;
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
use arrow_flight::sql::{
- ActionBeginSavepointRequest, ActionBeginSavepointResult,
- ActionBeginTransactionRequest, ActionBeginTransactionResult,
- ActionCancelQueryRequest, ActionCancelQueryResult,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
- ActionCreatePreparedStatementResult,
ActionCreatePreparedSubstraitPlanRequest,
- ActionEndSavepointRequest, ActionEndTransactionRequest, Any,
CommandGetCatalogs,
- CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
- CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
- CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
- CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery,
- CommandStatementSubstraitPlan, CommandStatementUpdate, ProstMessageExt,
SqlInfo,
- TicketStatementQuery,
+ ActionCreatePreparedStatementResult, Any, CommandGetTables,
+ CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
ProstMessageExt,
+ SqlInfo,
};
use arrow_flight::{
Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket,
};
-use arrow_schema::Schema;
+use arrow_schema::{DataType, Field, Schema};
use dashmap::DashMap;
use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::{DataFrame, ParquetReadOptions, SessionConfig,
SessionContext};
@@ -165,6 +158,43 @@ impl FlightSqlServiceImpl {
}
}
+ async fn tables(&self, ctx: Arc<SessionContext>) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, true),
+ Field::new("db_schema_name", DataType::Utf8, true),
+ Field::new("table_name", DataType::Utf8, false),
+ Field::new("table_type", DataType::Utf8, false),
+ ]));
+
+ let mut catalogs = vec![];
+ let mut schemas = vec![];
+ let mut names = vec![];
+ let mut types = vec![];
+ for catalog in ctx.catalog_names() {
+ let catalog_provider = ctx.catalog(&catalog).unwrap();
+ for schema in catalog_provider.schema_names() {
+ let schema_provider =
catalog_provider.schema(&schema).unwrap();
+ for table in schema_provider.table_names() {
+ let table_provider =
+ schema_provider.table(&table).await.unwrap().unwrap();
+ catalogs.push(catalog.clone());
+ schemas.push(schema.clone());
+ names.push(table.clone());
+ types.push(table_provider.table_type().to_string())
+ }
+ }
+ }
+
+ RecordBatch::try_new(
+ schema,
+ [catalogs, schemas, names, types]
+ .into_iter()
+ .map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
+ .collect::<Vec<_>>(),
+ )
+ .unwrap()
+ }
+
fn remove_plan(&self, handle: &str) -> Result<(), Status> {
self.statements.remove(&handle.to_string());
Ok(())
@@ -246,27 +276,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
Ok(Response::new(Box::pin(stream)))
}
- async fn get_flight_info_statement(
- &self,
- query: CommandStatementQuery,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_statement query:\n{}", query.query);
-
- Err(Status::unimplemented("Implement get_flight_info_statement"))
- }
-
- async fn get_flight_info_substrait_plan(
- &self,
- _query: CommandStatementSubstraitPlan,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_substrait_plan");
- Err(Status::unimplemented(
- "Implement get_flight_info_substrait_plan",
- ))
- }
-
async fn get_flight_info_prepared_statement(
&self,
cmd: CommandPreparedStatementQuery,
@@ -304,267 +313,50 @@ impl FlightSqlService for FlightSqlServiceImpl {
};
let buf = fetch.as_any().encode_to_vec().into();
let ticket = Ticket { ticket: buf };
- let endpoint = FlightEndpoint {
- ticket: Some(ticket),
- location: vec![],
- expiration_time: None,
- app_metadata: Default::default(),
- };
- let endpoints = vec![endpoint];
-
- let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default())
- .try_into()
- .map_err(|e| status!("Unable to serialize schema", e))?;
- let IpcMessage(schema_bytes) = message;
- let flight_desc = FlightDescriptor {
- r#type: DescriptorType::Cmd.into(),
- cmd: Default::default(),
- path: vec![],
- };
- // send -1 for total_records and total_bytes instead of iterating over
all the
- // batches to get num_rows() and total byte size.
- let info = FlightInfo {
- schema: schema_bytes,
- flight_descriptor: Some(flight_desc),
- endpoint: endpoints,
- total_records: -1_i64,
- total_bytes: -1_i64,
- ordered: false,
- app_metadata: Default::default(),
- };
+ let info = FlightInfo::new()
+ // Encode the Arrow schema
+ .try_with_schema(&schema)
+ .expect("encoding failed")
+ .with_endpoint(FlightEndpoint::new().with_ticket(ticket))
+ .with_descriptor(FlightDescriptor {
+ r#type: DescriptorType::Cmd.into(),
+ cmd: Default::default(),
+ path: vec![],
+ });
let resp = Response::new(info);
Ok(resp)
}
- async fn get_flight_info_catalogs(
- &self,
- _query: CommandGetCatalogs,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_catalogs");
- Err(Status::unimplemented("Implement get_flight_info_catalogs"))
- }
-
- async fn get_flight_info_schemas(
- &self,
- _query: CommandGetDbSchemas,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_schemas");
- Err(Status::unimplemented("Implement get_flight_info_schemas"))
- }
-
async fn get_flight_info_tables(
&self,
_query: CommandGetTables,
- _request: Request<FlightDescriptor>,
+ request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
info!("get_flight_info_tables");
- Err(Status::unimplemented("Implement get_flight_info_tables"))
- }
-
- async fn get_flight_info_table_types(
- &self,
- _query: CommandGetTableTypes,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_table_types");
- Err(Status::unimplemented(
- "Implement get_flight_info_table_types",
- ))
- }
-
- async fn get_flight_info_sql_info(
- &self,
- _query: CommandGetSqlInfo,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_sql_info");
- Err(Status::unimplemented("Implement CommandGetSqlInfo"))
- }
-
- async fn get_flight_info_primary_keys(
- &self,
- _query: CommandGetPrimaryKeys,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_primary_keys");
- Err(Status::unimplemented(
- "Implement get_flight_info_primary_keys",
- ))
- }
-
- async fn get_flight_info_exported_keys(
- &self,
- _query: CommandGetExportedKeys,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_exported_keys");
- Err(Status::unimplemented(
- "Implement get_flight_info_exported_keys",
- ))
- }
-
- async fn get_flight_info_imported_keys(
- &self,
- _query: CommandGetImportedKeys,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_imported_keys");
- Err(Status::unimplemented(
- "Implement get_flight_info_imported_keys",
- ))
- }
-
- async fn get_flight_info_cross_reference(
- &self,
- _query: CommandGetCrossReference,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_cross_reference");
- Err(Status::unimplemented(
- "Implement get_flight_info_cross_reference",
- ))
- }
-
- async fn get_flight_info_xdbc_type_info(
- &self,
- _query: CommandGetXdbcTypeInfo,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- info!("get_flight_info_xdbc_type_info");
- Err(Status::unimplemented(
- "Implement get_flight_info_xdbc_type_info",
- ))
- }
-
- async fn do_get_statement(
- &self,
- _ticket: TicketStatementQuery,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_statement");
- Err(Status::unimplemented("Implement do_get_statement"))
- }
-
- async fn do_get_prepared_statement(
- &self,
- _query: CommandPreparedStatementQuery,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_prepared_statement");
- Err(Status::unimplemented("Implement do_get_prepared_statement"))
- }
-
- async fn do_get_catalogs(
- &self,
- _query: CommandGetCatalogs,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_catalogs");
- Err(Status::unimplemented("Implement do_get_catalogs"))
- }
-
- async fn do_get_schemas(
- &self,
- _query: CommandGetDbSchemas,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_schemas");
- Err(Status::unimplemented("Implement do_get_schemas"))
- }
-
- async fn do_get_tables(
- &self,
- _query: CommandGetTables,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_tables");
- Err(Status::unimplemented("Implement do_get_tables"))
- }
-
- async fn do_get_table_types(
- &self,
- _query: CommandGetTableTypes,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_table_types");
- Err(Status::unimplemented("Implement do_get_table_types"))
- }
-
- async fn do_get_sql_info(
- &self,
- _query: CommandGetSqlInfo,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_sql_info");
- Err(Status::unimplemented("Implement do_get_sql_info"))
- }
-
- async fn do_get_primary_keys(
- &self,
- _query: CommandGetPrimaryKeys,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_primary_keys");
- Err(Status::unimplemented("Implement do_get_primary_keys"))
- }
-
- async fn do_get_exported_keys(
- &self,
- _query: CommandGetExportedKeys,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_exported_keys");
- Err(Status::unimplemented("Implement do_get_exported_keys"))
- }
-
- async fn do_get_imported_keys(
- &self,
- _query: CommandGetImportedKeys,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_imported_keys");
- Err(Status::unimplemented("Implement do_get_imported_keys"))
- }
-
- async fn do_get_cross_reference(
- &self,
- _query: CommandGetCrossReference,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_cross_reference");
- Err(Status::unimplemented("Implement do_get_cross_reference"))
- }
+ let ctx = self.get_ctx(&request)?;
+ let data = self.tables(ctx).await;
+ let schema = data.schema();
- async fn do_get_xdbc_type_info(
- &self,
- _query: CommandGetXdbcTypeInfo,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- info!("do_get_xdbc_type_info");
- Err(Status::unimplemented("Implement do_get_xdbc_type_info"))
- }
+ let uuid = Uuid::new_v4().hyphenated().to_string();
+ self.results.insert(uuid.clone(), vec![data]);
- async fn do_put_statement_update(
- &self,
- _ticket: CommandStatementUpdate,
- _request: Request<PeekableFlightDataStream>,
- ) -> Result<i64, Status> {
- info!("do_put_statement_update");
- Err(Status::unimplemented("Implement do_put_statement_update"))
- }
+ let fetch = FetchResults { handle: uuid };
+ let buf = fetch.as_any().encode_to_vec().into();
+ let ticket = Ticket { ticket: buf };
- async fn do_put_prepared_statement_query(
- &self,
- _query: CommandPreparedStatementQuery,
- _request: Request<PeekableFlightDataStream>,
- ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
- info!("do_put_prepared_statement_query");
- Err(Status::unimplemented(
- "Implement do_put_prepared_statement_query",
- ))
+ let info = FlightInfo::new()
+ // Encode the Arrow schema
+ .try_with_schema(&schema)
+ .expect("encoding failed")
+ .with_endpoint(FlightEndpoint::new().with_ticket(ticket))
+ .with_descriptor(FlightDescriptor {
+ r#type: DescriptorType::Cmd.into(),
+ cmd: Default::default(),
+ path: vec![],
+ });
+ let resp = Response::new(info);
+ Ok(resp)
}
async fn do_put_prepared_statement_update(
@@ -578,17 +370,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
Ok(-1)
}
- async fn do_put_substrait_plan(
- &self,
- _query: CommandStatementSubstraitPlan,
- _request: Request<PeekableFlightDataStream>,
- ) -> Result<i64, Status> {
- info!("do_put_prepared_statement_update");
- Err(Status::unimplemented(
- "Implement do_put_prepared_statement_update",
- ))
- }
-
async fn do_action_create_prepared_statement(
&self,
query: ActionCreatePreparedStatementRequest,
@@ -639,64 +420,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
Ok(())
}
- async fn do_action_create_prepared_substrait_plan(
- &self,
- _query: ActionCreatePreparedSubstraitPlanRequest,
- _request: Request<Action>,
- ) -> Result<ActionCreatePreparedStatementResult, Status> {
- info!("do_action_create_prepared_substrait_plan");
- Err(Status::unimplemented(
- "Implement do_action_create_prepared_substrait_plan",
- ))
- }
-
- async fn do_action_begin_transaction(
- &self,
- _query: ActionBeginTransactionRequest,
- _request: Request<Action>,
- ) -> Result<ActionBeginTransactionResult, Status> {
- info!("do_action_begin_transaction");
- Err(Status::unimplemented(
- "Implement do_action_begin_transaction",
- ))
- }
-
- async fn do_action_end_transaction(
- &self,
- _query: ActionEndTransactionRequest,
- _request: Request<Action>,
- ) -> Result<(), Status> {
- info!("do_action_end_transaction");
- Err(Status::unimplemented("Implement do_action_end_transaction"))
- }
-
- async fn do_action_begin_savepoint(
- &self,
- _query: ActionBeginSavepointRequest,
- _request: Request<Action>,
- ) -> Result<ActionBeginSavepointResult, Status> {
- info!("do_action_begin_savepoint");
- Err(Status::unimplemented("Implement do_action_begin_savepoint"))
- }
-
- async fn do_action_end_savepoint(
- &self,
- _query: ActionEndSavepointRequest,
- _request: Request<Action>,
- ) -> Result<(), Status> {
- info!("do_action_end_savepoint");
- Err(Status::unimplemented("Implement do_action_end_savepoint"))
- }
-
- async fn do_action_cancel_query(
- &self,
- _query: ActionCancelQueryRequest,
- _request: Request<Action>,
- ) -> Result<ActionCancelQueryResult, Status> {
- info!("do_action_cancel_query");
- Err(Status::unimplemented("Implement do_action_cancel_query"))
- }
-
async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
}
diff --git a/datafusion/expr/src/table_source.rs
b/datafusion/expr/src/table_source.rs
index f662f4d9f7..72ed51f444 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -61,6 +61,16 @@ pub enum TableType {
Temporary,
}
+impl std::fmt::Display for TableType {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TableType::Base => write!(f, "Base"),
+ TableType::View => write!(f, "View"),
+ TableType::Temporary => write!(f, "Temporary"),
+ }
+ }
+}
+
/// The TableSource trait is used during logical query planning and
optimizations and
/// provides access to schema information and filter push-down capabilities.
This trait
/// provides a subset of the functionality of the TableProvider trait in the
core
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]