alamb commented on code in PR #11938:
URL: https://github.com/apache/datafusion/pull/11938#discussion_r1720105929


##########
datafusion/core/src/datasource/flight/config.rs:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Only meant for registering the `flight` namespace with `datafusion-cli`
+
+use datafusion_common::config::{ConfigEntry, ConfigExtension, 
ExtensionOptions};
+use std::any::Any;
+use std::collections::HashMap;
+
+/// Collects and reports back config entries. Only used to persuade 
`datafusion-cli`
+/// to accept the `flight.` prefix for `CREATE EXTERNAL TABLE` options.
+#[derive(Default, Debug, Clone)]
+pub struct FlightOptions {

Review Comment:
   The configuration system is another thing I think that a focused standalone 
binary would be able to do better than DataFusion's key=value system



##########
datafusion/core/src/datasource/flight/mod.rs:
##########
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Generic [FlightTableFactory] that can connect to Arrow Flight services,
+//! with a [sql::FlightSqlDriver] provided out-of-the-box.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow_flight::error::FlightError;
+use arrow_flight::FlightInfo;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use tonic::metadata::MetadataMap;
+use tonic::transport::Channel;
+
+use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
+use datafusion_common::{project_schema, DataFusionError};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
+use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_expr::Partitioning::UnknownPartitioning;
+use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, PlanProperties};
+
+use crate::datasource::physical_plan::FlightExec;
+
+pub mod config;
+pub mod sql;
+
+/// Generic Arrow Flight data source. Requires a [FlightDriver] that allows 
implementors
+/// to integrate any custom Flight RPC service by producing a [FlightMetadata] 
for some DDL.
+///
+/// # Sample usage:
+/// ```
+/// use std::collections::HashMap;
+/// use arrow_flight::{FlightClient, FlightDescriptor};
+/// use tonic::transport::Channel;
+/// use datafusion::datasource::flight::{FlightMetadata, FlightDriver};
+/// use datafusion::prelude::SessionContext;
+/// use std::sync::Arc;
+/// use datafusion::datasource::flight::FlightTableFactory;
+///
+/// #[derive(Debug, Clone, Default)]
+/// struct CustomFlightDriver {}
+/// #[async_trait::async_trait]
+/// impl FlightDriver for CustomFlightDriver {
+///     async fn metadata(&self, channel: Channel, opts: &HashMap<String, 
String>)
+///             -> arrow_flight::error::Result<FlightMetadata> {
+///         let mut client = FlightClient::new(channel);
+///         // the `flight.` prefix is an already registered namespace in 
datafusion-cli
+///         let descriptor = 
FlightDescriptor::new_cmd(opts["flight.command"].clone());
+///         let flight_info = client.get_flight_info(descriptor).await?;
+///         FlightMetadata::try_from(flight_info)
+///     }
+/// }
+///
+/// #[tokio::main]
+/// async fn main() -> datafusion_common::Result<()> {
+///     let ctx = SessionContext::new();
+///     ctx.state_ref().write().table_factories_mut()
+///         .insert("CUSTOM_FLIGHT".into(), Arc::new(FlightTableFactory::new(
+///             Arc::new(CustomFlightDriver::default())
+///         )));
+///     let _ = ctx.sql(r#"
+///         CREATE EXTERNAL TABLE custom_flight_table STORED AS CUSTOM_FLIGHT
+///         LOCATION 'https://custom.flight.rpc'
+///         OPTIONS ('flight.command' 'select * from everywhere')
+///     "#).await; // will fail as it can't connect to the bogus URL, but we 
ignore the error
+///     Ok(())
+/// }
+///
+/// ```
+#[derive(Clone, Debug)]
+pub struct FlightTableFactory {
+    driver: Arc<dyn FlightDriver>,
+}
+
+impl FlightTableFactory {
+    /// Create a data source using the provided driver
+    pub fn new(driver: Arc<dyn FlightDriver>) -> Self {
+        Self { driver }
+    }
+
+    /// Convenient way to create a [FlightTable] programatically, as an 
alternative to DDL.
+    pub async fn open_table(
+        &self,
+        entry_point: impl Into<String>,
+        options: HashMap<String, String>,
+    ) -> datafusion_common::Result<FlightTable> {
+        let origin = entry_point.into();
+        let channel = Channel::from_shared(origin.clone())
+            .unwrap()
+            .connect()
+            .await
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+        let metadata = self
+            .driver
+            .metadata(channel.clone(), &options)
+            .await
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+        let logical_schema = 
metadata.plan_properties.eq_properties.schema().clone();
+        Ok(FlightTable {
+            driver: self.driver.clone(),
+            channel,
+            options,
+            origin,
+            logical_schema,
+        })
+    }
+}
+
+#[async_trait]
+impl TableProviderFactory for FlightTableFactory {
+    async fn create(
+        &self,
+        _state: &dyn Session,
+        cmd: &CreateExternalTable,
+    ) -> datafusion_common::Result<Arc<dyn TableProvider>> {
+        let table = self.open_table(&cmd.location, cmd.options.clone()).await?;
+        Ok(Arc::new(table))
+    }
+}
+
+/// Extension point for integrating any Flight RPC service as a 
[FlightTableFactory].
+/// Handles the initial `GetFlightInfo` call and all its prerequisites (such 
as `Handshake`),
+/// to produce a [FlightMetadata].
+#[async_trait]
+pub trait FlightDriver: Sync + Send + Debug {
+    /// Returns a [FlightMetadata] from the specified channel,
+    /// according to the provided table options.
+    /// The driver must provide at least a [FlightInfo] in order to construct 
a flight metadata.
+    async fn metadata(
+        &self,
+        channel: Channel,
+        options: &HashMap<String, String>,
+    ) -> arrow_flight::error::Result<FlightMetadata>;
+}
+
+/// The information that a [FlightDriver] must produce
+/// in order to register flights as DataFusion tables.
+#[derive(Clone, Debug)]
+pub struct FlightMetadata {
+    /// FlightInfo object produced by the driver
+    pub(super) flight_info: Arc<FlightInfo>,
+    /// Physical plan properties. Sensible defaults will be used if the
+    /// driver doesn't need (or care) to customize the execution plan.
+    pub(super) plan_properties: Arc<PlanProperties>,
+    /// The gRPC headers to use on the `DoGet` calls
+    pub(super) grpc_metadata: Arc<MetadataMap>,
+}
+
+impl FlightMetadata {
+    /// Provide custom [PlanProperties] to account for service specifics,
+    /// such as known partitioning scheme, unbounded execution mode etc.
+    pub fn new(info: FlightInfo, props: PlanProperties, grpc: MetadataMap) -> 
Self {

Review Comment:
   this is really cool -- we actually implemented some version of this custom 
for InfluxDB 3.0 for services that exchange data between themselves



##########
datafusion/core/src/datasource/flight/sql.rs:
##########
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Default [FlightDriver] for Flight SQL
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use arrow_flight::error::Result;
+use arrow_flight::flight_service_client::FlightServiceClient;
+use arrow_flight::sql::{CommandStatementQuery, ProstMessageExt};
+use arrow_flight::{FlightDescriptor, FlightInfo, HandshakeRequest, 
HandshakeResponse};
+use arrow_schema::ArrowError;
+use async_trait::async_trait;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
+use bytes::Bytes;
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tonic::metadata::{AsciiMetadataKey, MetadataMap};
+use tonic::transport::Channel;
+use tonic::IntoRequest;
+
+use crate::datasource::flight::{FlightDriver, FlightMetadata};
+
+/// Default Flight SQL driver. Requires a `flight.sql.query` to be passed as a 
table option.

Review Comment:
   Yeah, the configuration story for coinfiguring datasources like this is 
sorely lacking in datafusion-cli
   
   It would be kind of cool to support something like
   
   ```sql
   create crednetials for flightsql as ....
   
   select * from 'flightsql://myhost/SELECT%20*%20FROM%20foo'
   ```



##########
datafusion/core/src/datasource/flight/mod.rs:
##########
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Generic [FlightTableFactory] that can connect to Arrow Flight services,
+//! with a [sql::FlightSqlDriver] provided out-of-the-box.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow_flight::error::FlightError;
+use arrow_flight::FlightInfo;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use tonic::metadata::MetadataMap;
+use tonic::transport::Channel;
+
+use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
+use datafusion_common::{project_schema, DataFusionError};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
+use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_expr::Partitioning::UnknownPartitioning;
+use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, PlanProperties};
+
+use crate::datasource::physical_plan::FlightExec;
+
+pub mod config;
+pub mod sql;
+
+/// Generic Arrow Flight data source. Requires a [FlightDriver] that allows 
implementors
+/// to integrate any custom Flight RPC service by producing a [FlightMetadata] 
for some DDL.
+///
+/// # Sample usage:
+/// ```
+/// use std::collections::HashMap;
+/// use arrow_flight::{FlightClient, FlightDescriptor};
+/// use tonic::transport::Channel;
+/// use datafusion::datasource::flight::{FlightMetadata, FlightDriver};
+/// use datafusion::prelude::SessionContext;
+/// use std::sync::Arc;
+/// use datafusion::datasource::flight::FlightTableFactory;
+///
+/// #[derive(Debug, Clone, Default)]
+/// struct CustomFlightDriver {}
+/// #[async_trait::async_trait]
+/// impl FlightDriver for CustomFlightDriver {
+///     async fn metadata(&self, channel: Channel, opts: &HashMap<String, 
String>)
+///             -> arrow_flight::error::Result<FlightMetadata> {
+///         let mut client = FlightClient::new(channel);
+///         // the `flight.` prefix is an already registered namespace in 
datafusion-cli
+///         let descriptor = 
FlightDescriptor::new_cmd(opts["flight.command"].clone());
+///         let flight_info = client.get_flight_info(descriptor).await?;
+///         FlightMetadata::try_from(flight_info)
+///     }
+/// }
+///
+/// #[tokio::main]
+/// async fn main() -> datafusion_common::Result<()> {
+///     let ctx = SessionContext::new();
+///     ctx.state_ref().write().table_factories_mut()
+///         .insert("CUSTOM_FLIGHT".into(), Arc::new(FlightTableFactory::new(
+///             Arc::new(CustomFlightDriver::default())
+///         )));
+///     let _ = ctx.sql(r#"

Review Comment:
   this is really cool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to