This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fe89d0b2b5 Improve flight sql examples (#10432)
fe89d0b2b5 is described below

commit fe89d0b2b5db93cb508ad7bc8d6d7f5e3168e306
Author: 张林伟 <[email protected]>
AuthorDate: Fri May 10 19:46:44 2024 +0800

    Improve flight sql examples (#10432)
    
    * Implement get_flight_info_tables
    
    * remove unimplemented methods
    
    * Simplify FlightInfo construction
---
 .../examples/flight/flight_sql_server.rs           | 423 ++++-----------------
 datafusion/expr/src/table_source.rs                |  10 +
 2 files changed, 83 insertions(+), 350 deletions(-)

diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs 
b/datafusion-examples/examples/flight/flight_sql_server.rs
index f04a559d00..2e46daf7cb 100644
--- a/datafusion-examples/examples/flight/flight_sql_server.rs
+++ b/datafusion-examples/examples/flight/flight_sql_server.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::{ArrayRef, StringArray};
 use arrow::ipc::writer::IpcWriteOptions;
 use arrow::record_batch::RecordBatch;
 use arrow_flight::encode::FlightDataEncoderBuilder;
@@ -22,24 +23,16 @@ use arrow_flight::flight_descriptor::DescriptorType;
 use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
 use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
 use arrow_flight::sql::{
-    ActionBeginSavepointRequest, ActionBeginSavepointResult,
-    ActionBeginTransactionRequest, ActionBeginTransactionResult,
-    ActionCancelQueryRequest, ActionCancelQueryResult,
     ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
-    ActionCreatePreparedStatementResult, 
ActionCreatePreparedSubstraitPlanRequest,
-    ActionEndSavepointRequest, ActionEndTransactionRequest, Any, 
CommandGetCatalogs,
-    CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
-    CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
-    CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
-    CommandPreparedStatementQuery, CommandPreparedStatementUpdate, 
CommandStatementQuery,
-    CommandStatementSubstraitPlan, CommandStatementUpdate, ProstMessageExt, 
SqlInfo,
-    TicketStatementQuery,
+    ActionCreatePreparedStatementResult, Any, CommandGetTables,
+    CommandPreparedStatementQuery, CommandPreparedStatementUpdate, 
ProstMessageExt,
+    SqlInfo,
 };
 use arrow_flight::{
     Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
     HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket,
 };
-use arrow_schema::Schema;
+use arrow_schema::{DataType, Field, Schema};
 use dashmap::DashMap;
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::prelude::{DataFrame, ParquetReadOptions, SessionConfig, 
SessionContext};
@@ -165,6 +158,43 @@ impl FlightSqlServiceImpl {
         }
     }
 
+    async fn tables(&self, ctx: Arc<SessionContext>) -> RecordBatch {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("catalog_name", DataType::Utf8, true),
+            Field::new("db_schema_name", DataType::Utf8, true),
+            Field::new("table_name", DataType::Utf8, false),
+            Field::new("table_type", DataType::Utf8, false),
+        ]));
+
+        let mut catalogs = vec![];
+        let mut schemas = vec![];
+        let mut names = vec![];
+        let mut types = vec![];
+        for catalog in ctx.catalog_names() {
+            let catalog_provider = ctx.catalog(&catalog).unwrap();
+            for schema in catalog_provider.schema_names() {
+                let schema_provider = 
catalog_provider.schema(&schema).unwrap();
+                for table in schema_provider.table_names() {
+                    let table_provider =
+                        schema_provider.table(&table).await.unwrap().unwrap();
+                    catalogs.push(catalog.clone());
+                    schemas.push(schema.clone());
+                    names.push(table.clone());
+                    types.push(table_provider.table_type().to_string())
+                }
+            }
+        }
+
+        RecordBatch::try_new(
+            schema,
+            [catalogs, schemas, names, types]
+                .into_iter()
+                .map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
+                .collect::<Vec<_>>(),
+        )
+        .unwrap()
+    }
+
     fn remove_plan(&self, handle: &str) -> Result<(), Status> {
         self.statements.remove(&handle.to_string());
         Ok(())
@@ -246,27 +276,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
         Ok(Response::new(Box::pin(stream)))
     }
 
-    async fn get_flight_info_statement(
-        &self,
-        query: CommandStatementQuery,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_statement query:\n{}", query.query);
-
-        Err(Status::unimplemented("Implement get_flight_info_statement"))
-    }
-
-    async fn get_flight_info_substrait_plan(
-        &self,
-        _query: CommandStatementSubstraitPlan,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_substrait_plan");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_substrait_plan",
-        ))
-    }
-
     async fn get_flight_info_prepared_statement(
         &self,
         cmd: CommandPreparedStatementQuery,
@@ -304,267 +313,50 @@ impl FlightSqlService for FlightSqlServiceImpl {
         };
         let buf = fetch.as_any().encode_to_vec().into();
         let ticket = Ticket { ticket: buf };
-        let endpoint = FlightEndpoint {
-            ticket: Some(ticket),
-            location: vec![],
-            expiration_time: None,
-            app_metadata: Default::default(),
-        };
-        let endpoints = vec![endpoint];
-
-        let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default())
-            .try_into()
-            .map_err(|e| status!("Unable to serialize schema", e))?;
-        let IpcMessage(schema_bytes) = message;
 
-        let flight_desc = FlightDescriptor {
-            r#type: DescriptorType::Cmd.into(),
-            cmd: Default::default(),
-            path: vec![],
-        };
-        // send -1 for total_records and total_bytes instead of iterating over 
all the
-        // batches to get num_rows() and total byte size.
-        let info = FlightInfo {
-            schema: schema_bytes,
-            flight_descriptor: Some(flight_desc),
-            endpoint: endpoints,
-            total_records: -1_i64,
-            total_bytes: -1_i64,
-            ordered: false,
-            app_metadata: Default::default(),
-        };
+        let info = FlightInfo::new()
+            // Encode the Arrow schema
+            .try_with_schema(&schema)
+            .expect("encoding failed")
+            .with_endpoint(FlightEndpoint::new().with_ticket(ticket))
+            .with_descriptor(FlightDescriptor {
+                r#type: DescriptorType::Cmd.into(),
+                cmd: Default::default(),
+                path: vec![],
+            });
         let resp = Response::new(info);
         Ok(resp)
     }
 
-    async fn get_flight_info_catalogs(
-        &self,
-        _query: CommandGetCatalogs,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_catalogs");
-        Err(Status::unimplemented("Implement get_flight_info_catalogs"))
-    }
-
-    async fn get_flight_info_schemas(
-        &self,
-        _query: CommandGetDbSchemas,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_schemas");
-        Err(Status::unimplemented("Implement get_flight_info_schemas"))
-    }
-
     async fn get_flight_info_tables(
         &self,
         _query: CommandGetTables,
-        _request: Request<FlightDescriptor>,
+        request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
         info!("get_flight_info_tables");
-        Err(Status::unimplemented("Implement get_flight_info_tables"))
-    }
-
-    async fn get_flight_info_table_types(
-        &self,
-        _query: CommandGetTableTypes,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_table_types");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_table_types",
-        ))
-    }
-
-    async fn get_flight_info_sql_info(
-        &self,
-        _query: CommandGetSqlInfo,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_sql_info");
-        Err(Status::unimplemented("Implement CommandGetSqlInfo"))
-    }
-
-    async fn get_flight_info_primary_keys(
-        &self,
-        _query: CommandGetPrimaryKeys,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_primary_keys");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_primary_keys",
-        ))
-    }
-
-    async fn get_flight_info_exported_keys(
-        &self,
-        _query: CommandGetExportedKeys,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_exported_keys");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_exported_keys",
-        ))
-    }
-
-    async fn get_flight_info_imported_keys(
-        &self,
-        _query: CommandGetImportedKeys,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_imported_keys");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_imported_keys",
-        ))
-    }
-
-    async fn get_flight_info_cross_reference(
-        &self,
-        _query: CommandGetCrossReference,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_cross_reference");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_cross_reference",
-        ))
-    }
-
-    async fn get_flight_info_xdbc_type_info(
-        &self,
-        _query: CommandGetXdbcTypeInfo,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        info!("get_flight_info_xdbc_type_info");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_xdbc_type_info",
-        ))
-    }
-
-    async fn do_get_statement(
-        &self,
-        _ticket: TicketStatementQuery,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_statement");
-        Err(Status::unimplemented("Implement do_get_statement"))
-    }
-
-    async fn do_get_prepared_statement(
-        &self,
-        _query: CommandPreparedStatementQuery,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_prepared_statement");
-        Err(Status::unimplemented("Implement do_get_prepared_statement"))
-    }
-
-    async fn do_get_catalogs(
-        &self,
-        _query: CommandGetCatalogs,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_catalogs");
-        Err(Status::unimplemented("Implement do_get_catalogs"))
-    }
-
-    async fn do_get_schemas(
-        &self,
-        _query: CommandGetDbSchemas,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_schemas");
-        Err(Status::unimplemented("Implement do_get_schemas"))
-    }
-
-    async fn do_get_tables(
-        &self,
-        _query: CommandGetTables,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_tables");
-        Err(Status::unimplemented("Implement do_get_tables"))
-    }
-
-    async fn do_get_table_types(
-        &self,
-        _query: CommandGetTableTypes,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_table_types");
-        Err(Status::unimplemented("Implement do_get_table_types"))
-    }
-
-    async fn do_get_sql_info(
-        &self,
-        _query: CommandGetSqlInfo,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_sql_info");
-        Err(Status::unimplemented("Implement do_get_sql_info"))
-    }
-
-    async fn do_get_primary_keys(
-        &self,
-        _query: CommandGetPrimaryKeys,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_primary_keys");
-        Err(Status::unimplemented("Implement do_get_primary_keys"))
-    }
-
-    async fn do_get_exported_keys(
-        &self,
-        _query: CommandGetExportedKeys,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_exported_keys");
-        Err(Status::unimplemented("Implement do_get_exported_keys"))
-    }
-
-    async fn do_get_imported_keys(
-        &self,
-        _query: CommandGetImportedKeys,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_imported_keys");
-        Err(Status::unimplemented("Implement do_get_imported_keys"))
-    }
-
-    async fn do_get_cross_reference(
-        &self,
-        _query: CommandGetCrossReference,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_cross_reference");
-        Err(Status::unimplemented("Implement do_get_cross_reference"))
-    }
+        let ctx = self.get_ctx(&request)?;
+        let data = self.tables(ctx).await;
+        let schema = data.schema();
 
-    async fn do_get_xdbc_type_info(
-        &self,
-        _query: CommandGetXdbcTypeInfo,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        info!("do_get_xdbc_type_info");
-        Err(Status::unimplemented("Implement do_get_xdbc_type_info"))
-    }
+        let uuid = Uuid::new_v4().hyphenated().to_string();
+        self.results.insert(uuid.clone(), vec![data]);
 
-    async fn do_put_statement_update(
-        &self,
-        _ticket: CommandStatementUpdate,
-        _request: Request<PeekableFlightDataStream>,
-    ) -> Result<i64, Status> {
-        info!("do_put_statement_update");
-        Err(Status::unimplemented("Implement do_put_statement_update"))
-    }
+        let fetch = FetchResults { handle: uuid };
+        let buf = fetch.as_any().encode_to_vec().into();
+        let ticket = Ticket { ticket: buf };
 
-    async fn do_put_prepared_statement_query(
-        &self,
-        _query: CommandPreparedStatementQuery,
-        _request: Request<PeekableFlightDataStream>,
-    ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
-        info!("do_put_prepared_statement_query");
-        Err(Status::unimplemented(
-            "Implement do_put_prepared_statement_query",
-        ))
+        let info = FlightInfo::new()
+            // Encode the Arrow schema
+            .try_with_schema(&schema)
+            .expect("encoding failed")
+            .with_endpoint(FlightEndpoint::new().with_ticket(ticket))
+            .with_descriptor(FlightDescriptor {
+                r#type: DescriptorType::Cmd.into(),
+                cmd: Default::default(),
+                path: vec![],
+            });
+        let resp = Response::new(info);
+        Ok(resp)
     }
 
     async fn do_put_prepared_statement_update(
@@ -578,17 +370,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
         Ok(-1)
     }
 
-    async fn do_put_substrait_plan(
-        &self,
-        _query: CommandStatementSubstraitPlan,
-        _request: Request<PeekableFlightDataStream>,
-    ) -> Result<i64, Status> {
-        info!("do_put_prepared_statement_update");
-        Err(Status::unimplemented(
-            "Implement do_put_prepared_statement_update",
-        ))
-    }
-
     async fn do_action_create_prepared_statement(
         &self,
         query: ActionCreatePreparedStatementRequest,
@@ -639,64 +420,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
         Ok(())
     }
 
-    async fn do_action_create_prepared_substrait_plan(
-        &self,
-        _query: ActionCreatePreparedSubstraitPlanRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionCreatePreparedStatementResult, Status> {
-        info!("do_action_create_prepared_substrait_plan");
-        Err(Status::unimplemented(
-            "Implement do_action_create_prepared_substrait_plan",
-        ))
-    }
-
-    async fn do_action_begin_transaction(
-        &self,
-        _query: ActionBeginTransactionRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionBeginTransactionResult, Status> {
-        info!("do_action_begin_transaction");
-        Err(Status::unimplemented(
-            "Implement do_action_begin_transaction",
-        ))
-    }
-
-    async fn do_action_end_transaction(
-        &self,
-        _query: ActionEndTransactionRequest,
-        _request: Request<Action>,
-    ) -> Result<(), Status> {
-        info!("do_action_end_transaction");
-        Err(Status::unimplemented("Implement do_action_end_transaction"))
-    }
-
-    async fn do_action_begin_savepoint(
-        &self,
-        _query: ActionBeginSavepointRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionBeginSavepointResult, Status> {
-        info!("do_action_begin_savepoint");
-        Err(Status::unimplemented("Implement do_action_begin_savepoint"))
-    }
-
-    async fn do_action_end_savepoint(
-        &self,
-        _query: ActionEndSavepointRequest,
-        _request: Request<Action>,
-    ) -> Result<(), Status> {
-        info!("do_action_end_savepoint");
-        Err(Status::unimplemented("Implement do_action_end_savepoint"))
-    }
-
-    async fn do_action_cancel_query(
-        &self,
-        _query: ActionCancelQueryRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionCancelQueryResult, Status> {
-        info!("do_action_cancel_query");
-        Err(Status::unimplemented("Implement do_action_cancel_query"))
-    }
-
     async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
 }
 
diff --git a/datafusion/expr/src/table_source.rs 
b/datafusion/expr/src/table_source.rs
index f662f4d9f7..72ed51f444 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -61,6 +61,16 @@ pub enum TableType {
     Temporary,
 }
 
+impl std::fmt::Display for TableType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            TableType::Base => write!(f, "Base"),
+            TableType::View => write!(f, "View"),
+            TableType::Temporary => write!(f, "Temporary"),
+        }
+    }
+}
+
 /// The TableSource trait is used during logical query planning and 
optimizations and
 /// provides access to schema information and filter push-down capabilities. 
This trait
 /// provides a subset of the functionality of the TableProvider trait in the 
core


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to