This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 559bcf29 feat: remove flight-sql (#1228)
559bcf29 is described below

commit 559bcf29ed0719d4fb133bba4d39e48c18f45891
Author: Marko Milenković <[email protected]>
AuthorDate: Fri Apr 18 17:08:31 2025 +0100

    feat: remove flight-sql (#1228)
---
 Cargo.lock                                  |    1 -
 ballista/scheduler/Cargo.toml               |    2 -
 ballista/scheduler/src/flight_sql.rs        | 1014 ---------------------------
 ballista/scheduler/src/lib.rs               |    2 -
 ballista/scheduler/src/scheduler_process.rs |   16 +-
 docs/source/user-guide/configs.md           |    1 -
 docs/source/user-guide/flightsql.md         |  103 ---
 7 files changed, 1 insertion(+), 1138 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 327ddb83..8c872ccf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1056,7 +1056,6 @@ dependencies = [
  "async-trait",
  "axum",
  "ballista-core",
- "base64 0.22.1",
  "clap 4.5.36",
  "configure_me",
  "configure_me_codegen",
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 88d605f5..c0078a67 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -37,7 +37,6 @@ required-features = ["build-binary"]
 [features]
 build-binary = ["configure_me", "clap", "tracing-subscriber", 
"tracing-appender", "tracing", "ballista-core/build-binary"]
 default = ["build-binary"]
-flight-sql = ["base64"]
 graphviz-support = ["dep:graphviz-rust"]
 keda-scaler = []
 prometheus-metrics = ["prometheus", "once_cell"]
@@ -48,7 +47,6 @@ arrow-flight = { workspace = true }
 async-trait = { workspace = true }
 axum = "0.7.7"
 ballista-core = { path = "../core", version = "46.0.0" }
-base64 = { version = "0.22", optional = true }
 clap = { workspace = true, optional = true }
 configure_me = { workspace = true, optional = true }
 dashmap = { workspace = true }
diff --git a/ballista/scheduler/src/flight_sql.rs 
b/ballista/scheduler/src/flight_sql.rs
deleted file mode 100644
index 8dfd91f6..00000000
--- a/ballista/scheduler/src/flight_sql.rs
+++ /dev/null
@@ -1,1014 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow_flight::flight_descriptor::DescriptorType;
-use arrow_flight::flight_service_server::FlightService;
-use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
-use arrow_flight::sql::{
-    ActionBeginSavepointRequest, ActionBeginSavepointResult,
-    ActionBeginTransactionRequest, ActionBeginTransactionResult,
-    ActionCancelQueryRequest, ActionCancelQueryResult,
-    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
-    ActionCreatePreparedStatementResult, 
ActionCreatePreparedSubstraitPlanRequest,
-    ActionEndSavepointRequest, ActionEndTransactionRequest, CommandGetCatalogs,
-    CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
-    CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
-    CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
-    CommandPreparedStatementQuery, CommandPreparedStatementUpdate, 
CommandStatementQuery,
-    CommandStatementSubstraitPlan, CommandStatementUpdate, 
DoPutPreparedStatementResult,
-    SqlInfo, TicketStatementQuery,
-};
-use arrow_flight::{
-    Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, 
HandshakeRequest,
-    HandshakeResponse, Ticket,
-};
-use base64::Engine;
-use futures::Stream;
-use log::{debug, error, warn};
-use std::convert::TryFrom;
-use std::pin::Pin;
-use std::str::FromStr;
-use std::string::ToString;
-use std::sync::Arc;
-use std::time::Duration;
-use tonic::{Request, Response, Status, Streaming};
-
-use crate::scheduler_server::SchedulerServer;
-use arrow_flight::flight_service_client::FlightServiceClient;
-use arrow_flight::sql::ProstMessageExt;
-use arrow_flight::utils::batches_to_flight_data;
-use arrow_flight::SchemaAsIpc;
-use ballista_core::serde::protobuf;
-use ballista_core::serde::protobuf::action::ActionType::FetchPartition;
-use ballista_core::serde::protobuf::job_status;
-use ballista_core::serde::protobuf::JobStatus;
-use ballista_core::serde::protobuf::SuccessfulJob;
-use ballista_core::utils::create_grpc_client_connection;
-use dashmap::DashMap;
-use datafusion::arrow;
-use datafusion::arrow::array::{ArrayRef, StringArray};
-use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::arrow::error::ArrowError;
-use datafusion::arrow::ipc::writer::{
-    DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
-};
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::common::DFSchemaRef;
-use datafusion::logical_expr::LogicalPlan;
-use datafusion::prelude::SessionContext;
-use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
-use prost::bytes::Bytes;
-use prost::Message;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-use tokio::time::sleep;
-use tokio_stream::wrappers::ReceiverStream;
-use tonic::metadata::MetadataValue;
-use uuid::Uuid;
-
-pub struct FlightSqlServiceImpl {
-    server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
-    statements: Arc<DashMap<Uuid, LogicalPlan>>,
-    contexts: Arc<DashMap<Uuid, Arc<SessionContext>>>,
-}
-
-const TABLE_TYPES: [&str; 2] = ["TABLE", "VIEW"];
-
-impl FlightSqlServiceImpl {
-    pub fn new(server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>) -> 
Self {
-        Self {
-            server,
-            statements: Default::default(),
-            contexts: Default::default(),
-        }
-    }
-
-    fn tables(&self, ctx: Arc<SessionContext>) -> Result<RecordBatch, 
ArrowError> {
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("catalog_name", DataType::Utf8, true),
-            Field::new("db_schema_name", DataType::Utf8, true),
-            Field::new("table_name", DataType::Utf8, false),
-            Field::new("table_type", DataType::Utf8, false),
-        ]));
-        let mut names: Vec<Option<String>> = vec![];
-        for catalog_name in ctx.catalog_names() {
-            let catalog = ctx
-                .catalog(&catalog_name)
-                .expect("catalog should have been found");
-            for schema_name in catalog.schema_names() {
-                let schema = catalog
-                    .schema(&schema_name)
-                    .expect("schema should have been found");
-                for table_name in schema.table_names() {
-                    names.push(Some(table_name));
-                }
-            }
-        }
-        let types: Vec<_> = names.iter().map(|_| 
Some("TABLE".to_string())).collect();
-        let cats: Vec<_> = names.iter().map(|_| None).collect();
-        let schemas: Vec<_> = names.iter().map(|_| None).collect();
-        let rb = RecordBatch::try_new(
-            schema,
-            [cats, schemas, names, types]
-                .iter()
-                .map(|i| Arc::new(StringArray::from(i.clone())) as ArrayRef)
-                .collect::<Vec<_>>(),
-        )?;
-        Ok(rb)
-    }
-
-    fn table_types() -> Result<RecordBatch, ArrowError> {
-        let schema = Arc::new(Schema::new(vec![Field::new(
-            "table_type",
-            DataType::Utf8,
-            false,
-        )]));
-        RecordBatch::try_new(
-            schema,
-            [TABLE_TYPES]
-                .iter()
-                .map(|i| Arc::new(StringArray::from(i.to_vec())) as ArrayRef)
-                .collect::<Vec<_>>(),
-        )
-    }
-
-    async fn create_ctx(&self) -> Result<Uuid, Status> {
-        let config = self.server.state.session_manager.produce_config();
-        let ctx = self
-            .server
-            .state
-            .session_manager
-            .create_session(&config)
-            .await
-            .map_err(|e| {
-                Status::internal(format!("Failed to create SessionContext: 
{e:?}"))
-            })?;
-        let handle = Uuid::new_v4();
-        self.contexts.insert(handle, ctx);
-        Ok(handle)
-    }
-
-    fn get_ctx<T>(&self, req: &Request<T>) -> Result<Arc<SessionContext>, 
Status> {
-        let auth = req
-            .metadata()
-            .get("authorization")
-            .ok_or_else(|| Status::internal("No authorization header!"))?;
-        let str = auth
-            .to_str()
-            .map_err(|e| Status::internal(format!("Error parsing header: 
{e}")))?;
-        let authorization = str.to_string();
-        let bearer = "Bearer ";
-        if !authorization.starts_with(bearer) {
-            Err(Status::internal("Invalid auth header!"))?;
-        }
-        let auth = authorization[bearer.len()..].to_string();
-
-        let handle = Uuid::from_str(auth.as_str())
-            .map_err(|e| Status::internal(format!("Error locking contexts: 
{e}")))?;
-        if let Some(context) = self.contexts.get(&handle) {
-            Ok(context.clone())
-        } else {
-            Err(Status::internal(format!(
-                "Context handle not found: {handle}"
-            )))?
-        }
-    }
-
-    async fn prepare_statement(
-        query: &str,
-        ctx: &Arc<SessionContext>,
-    ) -> Result<LogicalPlan, Status> {
-        let plan = ctx
-            .sql(query)
-            .await
-            .and_then(|df| df.into_optimized_plan())
-            .map_err(|e| Status::internal(format!("Error building plan: 
{e}")))?;
-        Ok(plan)
-    }
-
-    async fn check_job(&self, job_id: &String) -> 
Result<Option<SuccessfulJob>, Status> {
-        let status = self
-            .server
-            .state
-            .task_manager
-            .get_job_status(job_id)
-            .await
-            .map_err(|e| {
-                let msg = format!("Error getting status for job {job_id}: 
{e:?}");
-                error!("{}", msg);
-                Status::internal(msg)
-            })?;
-        let status: JobStatus = match status {
-            Some(status) => status,
-            None => {
-                let msg = format!("Error getting status for job {job_id}!");
-                error!("{}", msg);
-                Err(Status::internal(msg))?
-            }
-        };
-        let status: job_status::Status = match status.status {
-            Some(status) => status,
-            None => {
-                let msg = format!("Error getting status for job {job_id}!");
-                error!("{}", msg);
-                Err(Status::internal(msg))?
-            }
-        };
-        match status {
-            job_status::Status::Queued(_) => Ok(None),
-            job_status::Status::Running(_) => Ok(None),
-            job_status::Status::Failed(e) => {
-                warn!("Error executing plan: {:?}", e);
-                Err(Status::internal(format!(
-                    "Error executing plan: {}",
-                    e.error
-                )))?
-            }
-            job_status::Status::Successful(comp) => Ok(Some(comp)),
-        }
-    }
-
-    async fn job_to_fetch_part(
-        &self,
-        completed: SuccessfulJob,
-        num_rows: &mut i64,
-        num_bytes: &mut i64,
-    ) -> Result<Vec<FlightEndpoint>, Status> {
-        let mut fieps: Vec<_> = vec![];
-        for loc in completed.partition_location.iter() {
-            let (exec_host, exec_port) = if let Some(ref md) = 
loc.executor_meta {
-                (md.host.clone(), md.port)
-            } else {
-                Err(Status::internal(
-                    "Invalid partition location, missing executor metadata and 
advertise_endpoint flag is undefined.".to_string(),
-                ))?
-            };
-
-            let (host, port) = match &self
-                .server
-                .state
-                .config
-                .advertise_flight_sql_endpoint
-            {
-                Some(endpoint) => {
-                    let advertise_endpoint_vec: Vec<&str> = 
endpoint.split(':').collect();
-                    match advertise_endpoint_vec.as_slice() {
-                        [host_ip, port] => {
-                            (String::from(*host_ip), 
FromStr::from_str(port).expect("Failed to parse port from advertise-endpoint."))
-                        }
-                        _ => {
-                            Err(Status::internal("advertise-endpoint flag has 
incorrect format. Expected IP:Port".to_string()))?
-                        }
-                    }
-                }
-                None => (exec_host.clone(), exec_port),
-            };
-
-            let fetch = if let Some(ref id) = loc.partition_id {
-                let fetch = protobuf::FetchPartition {
-                    job_id: id.job_id.clone(),
-                    stage_id: id.stage_id,
-                    partition_id: id.partition_id,
-                    path: loc.path.clone(),
-                    // Use executor ip:port for routing to flight result
-                    host: exec_host.clone(),
-                    port: exec_port,
-                };
-                protobuf::Action {
-                    action_type: Some(FetchPartition(fetch)),
-                    settings: vec![],
-                }
-            } else {
-                Err(Status::internal("Error getting partition 
ID".to_string()))?
-            };
-            if let Some(ref stats) = loc.partition_stats {
-                *num_rows += stats.num_rows;
-                *num_bytes += stats.num_bytes;
-            } else {
-                Err(Status::internal("Error getting stats".to_string()))?
-            }
-            let authority = format!("{}:{}", &host, &port);
-            let buf = fetch.as_any().encode_to_vec();
-            let ticket = Ticket { ticket: buf.into() };
-            let fiep = FlightEndpoint::new()
-                .with_ticket(ticket)
-                .with_location(format!("grpc+tcp://{authority}"));
-            fieps.push(fiep);
-        }
-        Ok(fieps)
-    }
-
-    fn make_local_fieps(&self, job_id: &str) -> Result<Vec<FlightEndpoint>, 
Status> {
-        let (host, port) = ("127.0.0.1".to_string(), 50050); // TODO: use 
advertise host
-        let fetch = protobuf::FetchPartition {
-            job_id: job_id.to_string(),
-            stage_id: 0,
-            partition_id: 0,
-            path: job_id.to_string(),
-            host: host.clone(),
-            port,
-        };
-        let fetch = protobuf::Action {
-            action_type: Some(FetchPartition(fetch)),
-            settings: vec![],
-        };
-        let authority = format!("{}:{}", &host, &port); // TODO: use advertise 
host
-        let buf = fetch.as_any().encode_to_vec();
-        let ticket = Ticket { ticket: buf.into() };
-        let fiep = FlightEndpoint::new()
-            .with_ticket(ticket)
-            .with_location(format!("grpc+tcp://{authority}"));
-        let fieps = vec![fiep];
-        Ok(fieps)
-    }
-
-    fn cache_plan(&self, plan: LogicalPlan) -> Result<Uuid, Status> {
-        let handle = Uuid::new_v4();
-        self.statements.insert(handle, plan);
-        Ok(handle)
-    }
-
-    fn get_plan(&self, handle: &Uuid) -> Result<LogicalPlan, Status> {
-        if let Some(plan) = self.statements.get(handle) {
-            Ok(plan.clone())
-        } else {
-            Err(Status::internal(format!(
-                "Statement handle not found: {handle}"
-            )))?
-        }
-    }
-
-    fn remove_plan(&self, handle: Uuid) -> Result<(), Status> {
-        self.statements.remove(&handle);
-        Ok(())
-    }
-
-    fn df_schema_to_arrow(&self, schema: &DFSchemaRef) -> Result<Vec<u8>, 
Status> {
-        let arrow_schema: Schema = (&**schema).into();
-        let schema_bytes = self.schema_to_arrow(Arc::new(arrow_schema))?;
-        Ok(schema_bytes)
-    }
-
-    fn schema_to_arrow(&self, arrow_schema: SchemaRef) -> Result<Vec<u8>, 
Status> {
-        let options = IpcWriteOptions::default();
-        let pair = SchemaAsIpc::new(&arrow_schema, &options);
-        let data_gen = IpcDataGenerator::default();
-        let mut dictionary_tracker = 
DictionaryTracker::new_with_preserve_dict_id(
-            false,
-            pair.1.preserve_dict_id(),
-        );
-        let encoded_data = data_gen.schema_to_bytes_with_dictionary_tracker(
-            pair.0,
-            &mut dictionary_tracker,
-            pair.1,
-        );
-        let mut schema_bytes = vec![];
-        arrow::ipc::writer::write_message(&mut schema_bytes, encoded_data, 
pair.1)
-            .map_err(|e| Status::internal(format!("Error encoding schema: 
{e}")))?;
-        Ok(schema_bytes)
-    }
-
-    async fn enqueue_job(
-        &self,
-        ctx: Arc<SessionContext>,
-        plan: &LogicalPlan,
-    ) -> Result<String, Status> {
-        let job_id = self.server.state.task_manager.generate_job_id();
-        let job_name = format!("Flight SQL job {job_id}");
-        self.server
-            .submit_job(&job_id, &job_name, ctx, plan)
-            .await
-            .map_err(|e| {
-                let msg = format!("Failed to send JobQueued event for 
{job_id}: {e:?}");
-                error!("{}", msg);
-                Status::internal(msg)
-            })?;
-        Ok(job_id)
-    }
-
-    fn create_resp(
-        schema_bytes: Vec<u8>,
-        fieps: Vec<FlightEndpoint>,
-        num_rows: i64,
-        num_bytes: i64,
-    ) -> Response<FlightInfo> {
-        let flight_desc = FlightDescriptor {
-            r#type: DescriptorType::Cmd.into(),
-            cmd: Vec::new().into(),
-            path: vec![],
-        };
-        let info = FlightInfo {
-            schema: schema_bytes.into(),
-            flight_descriptor: Some(flight_desc),
-            endpoint: fieps,
-            total_records: num_rows,
-            total_bytes: num_bytes,
-            ordered: false,
-            app_metadata: Bytes::new(),
-        };
-        Response::new(info)
-    }
-
-    async fn execute_plan(
-        &self,
-        ctx: Arc<SessionContext>,
-        plan: &LogicalPlan,
-    ) -> Result<Response<FlightInfo>, Status> {
-        let job_id = self.enqueue_job(ctx, plan).await?;
-
-        // poll for job completion
-        let mut num_rows = 0;
-        let mut num_bytes = 0;
-        let fieps = loop {
-            sleep(Duration::from_millis(100)).await;
-            let completed = if let Some(comp) = self.check_job(&job_id).await? 
{
-                comp
-            } else {
-                continue;
-            };
-            let fieps = self
-                .job_to_fetch_part(completed, &mut num_rows, &mut num_bytes)
-                .await?;
-            break fieps;
-        };
-
-        // Generate response
-        let schema_bytes = self.df_schema_to_arrow(plan.schema())?;
-        let resp = Self::create_resp(schema_bytes, fieps, num_rows, num_bytes);
-        Ok(resp)
-    }
-
-    async fn record_batch_to_resp(
-        rb: RecordBatch,
-    ) -> Result<
-        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + 
Send>>>,
-        Status,
-    > {
-        type FlightResult = Result<FlightData, Status>;
-        let (tx, rx): (Sender<FlightResult>, Receiver<FlightResult>) = 
channel(2);
-        let schema = rb.schema();
-        let flights = batches_to_flight_data(&schema, vec![rb])
-            .map_err(|_| Status::internal("Error encoding 
batches".to_string()))?;
-        for flight in flights {
-            tx.send(Ok(flight))
-                .await
-                .map_err(|_| Status::internal("Error sending 
flight".to_string()))?;
-        }
-        let resp = Response::new(Box::pin(ReceiverStream::new(rx))
-            as Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 
'static>>);
-        Ok(resp)
-    }
-
-    fn batch_to_schema_resp(
-        &self,
-        data: &RecordBatch,
-        name: &str,
-    ) -> Result<Response<FlightInfo>, Status> {
-        let num_bytes = data.get_array_memory_size() as i64;
-        let schema = data.schema();
-        let num_rows = data.num_rows() as i64;
-
-        let fieps = self.make_local_fieps(name)?;
-        let schema_bytes = self.schema_to_arrow(schema)?;
-        let resp = Self::create_resp(schema_bytes, fieps, num_rows, num_bytes);
-        Ok(resp)
-    }
-}
-
-#[tonic::async_trait]
-impl FlightSqlService for FlightSqlServiceImpl {
-    type FlightService = FlightSqlServiceImpl;
-
-    async fn do_handshake(
-        &self,
-        request: Request<Streaming<HandshakeRequest>>,
-    ) -> Result<
-        Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> 
+ Send>>>,
-        Status,
-    > {
-        debug!("do_handshake");
-        for md in request.metadata().iter() {
-            debug!("{:?}", md);
-        }
-
-        let basic = "Basic ";
-        let authorization = request
-            .metadata()
-            .get("authorization")
-            .ok_or_else(|| Status::invalid_argument("authorization field not 
present"))?
-            .to_str()
-            .map_err(|_| Status::invalid_argument("authorization not 
parsable"))?;
-        if !authorization.starts_with(basic) {
-            Err(Status::invalid_argument(format!(
-                "Auth type not implemented: {authorization}"
-            )))?;
-        }
-        let bytes = base64::engine::general_purpose::STANDARD
-            .decode(&authorization[basic.len()..])
-            .map_err(|_| Status::invalid_argument("authorization not 
parsable"))?;
-        let str = String::from_utf8(bytes)
-            .map_err(|_| Status::invalid_argument("authorization not 
parsable"))?;
-        let parts: Vec<_> = str.split(':').collect();
-        if parts.len() != 2 {
-            Err(Status::invalid_argument("Invalid authorization header"))?;
-        }
-        let user = parts[0];
-        let pass = parts[1];
-        if user != "admin" || pass != "password" {
-            Err(Status::unauthenticated("Invalid credentials!"))?
-        }
-
-        let token = self.create_ctx().await?;
-
-        let result = HandshakeResponse {
-            protocol_version: 0,
-            payload: token.as_bytes().to_vec().into(),
-        };
-        let result = Ok(result);
-        let output = futures::stream::iter(vec![result]);
-        let str = format!("Bearer {token}");
-        let mut resp: Response<Pin<Box<dyn Stream<Item = Result<_, _>> + 
Send>>> =
-            Response::new(Box::pin(output));
-        let md = MetadataValue::try_from(str)
-            .map_err(|_| Status::invalid_argument("authorization not 
parsable"))?;
-        resp.metadata_mut().insert("authorization", md);
-        Ok(resp)
-    }
-
-    async fn do_get_fallback(
-        &self,
-        request: Request<Ticket>,
-        message: arrow_flight::sql::Any,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_fallback type_url: {}", message.type_url);
-        let ctx = self.get_ctx(&request)?;
-        if !message.is::<protobuf::Action>() {
-            Err(Status::unimplemented(format!(
-                "do_get: The defined request is invalid: {}",
-                message.type_url
-            )))?
-        }
-
-        let action: protobuf::Action = message
-            .unpack()
-            .map_err(|e| Status::internal(format!("{e:?}")))?
-            .ok_or_else(|| Status::internal("Expected an Action but got 
None!"))?;
-        let fp = match &action.action_type {
-            Some(FetchPartition(fp)) => fp.clone(),
-            None => Err(Status::internal("Expected an ActionType but got 
None!"))?,
-        };
-
-        // Well-known job ID: respond with the data
-        match fp.job_id.as_str() {
-            "get_flight_info_table_types" => {
-                debug!("Responding with table types");
-                let rb = FlightSqlServiceImpl::table_types().map_err(|_| {
-                    Status::internal("Error getting table types".to_string())
-                })?;
-                let resp = Self::record_batch_to_resp(rb).await?;
-                return Ok(resp);
-            }
-            "get_flight_info_tables" => {
-                debug!("Responding with tables");
-                let rb = self
-                    .tables(ctx)
-                    .map_err(|_| Status::internal("Error getting 
tables".to_string()))?;
-                let resp = Self::record_batch_to_resp(rb).await?;
-                return Ok(resp);
-            }
-            _ => {}
-        }
-
-        // Proxy the flight
-        let addr = format!("http://{}:{}";, fp.host, fp.port);
-        debug!("Scheduler proxying flight for to {}", addr);
-        let connection =
-            create_grpc_client_connection(addr.clone())
-                .await
-                .map_err(|e| {
-                    Status::internal(format!(
-                    "Error connecting to Ballista scheduler or executor at 
{addr}: {e:?}"
-                ))
-                })?;
-        let mut flight_client = FlightServiceClient::new(connection);
-        let buf = action.encode_to_vec();
-        let request = Request::new(Ticket { ticket: buf.into() });
-
-        let stream = flight_client
-            .do_get(request)
-            .await
-            .map_err(|e| Status::internal(format!("{e:?}")))?
-            .into_inner();
-        Ok(Response::new(Box::pin(stream)))
-    }
-
-    /// Get a FlightDataStream containing the data related to the supported 
XDBC types.
-    async fn do_get_xdbc_type_info(
-        &self,
-        _query: CommandGetXdbcTypeInfo,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_xdbc_type_info");
-        Err(Status::unimplemented("Implement do_get_xdbc_type_info"))
-    }
-
-    async fn get_flight_info_statement(
-        &self,
-        query: CommandStatementQuery,
-        request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_statement query:\n{}", query.query);
-
-        let ctx = self.get_ctx(&request)?;
-        let plan = Self::prepare_statement(&query.query, &ctx).await?;
-        let resp = self.execute_plan(ctx, &plan).await?;
-
-        debug!("Returning flight info...");
-        Ok(resp)
-    }
-
-    async fn get_flight_info_prepared_statement(
-        &self,
-        handle: CommandPreparedStatementQuery,
-        request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_prepared_statement");
-        let ctx = self.get_ctx(&request)?;
-        let handle = 
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
-            .map_err(|e| Status::internal(format!("Error decoding handle: 
{e}")))?;
-        let plan = self.get_plan(&handle)?;
-        let resp = self.execute_plan(ctx, &plan).await?;
-
-        debug!("Responding to query {}...", handle);
-        Ok(resp)
-    }
-
-    async fn get_flight_info_catalogs(
-        &self,
-        _query: CommandGetCatalogs,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_catalogs");
-        Err(Status::unimplemented("Implement get_flight_info_catalogs"))
-    }
-    async fn get_flight_info_schemas(
-        &self,
-        _query: CommandGetDbSchemas,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_schemas");
-        Err(Status::unimplemented("Implement get_flight_info_schemas"))
-    }
-
-    async fn get_flight_info_tables(
-        &self,
-        _query: CommandGetTables,
-        request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_tables");
-        let ctx = self.get_ctx(&request)?;
-        let data = self
-            .tables(ctx)
-            .map_err(|e| Status::internal(format!("Error getting tables: 
{e}")))?;
-        let resp = self.batch_to_schema_resp(&data, "get_flight_info_tables")?;
-        Ok(resp)
-    }
-
-    async fn get_flight_info_table_types(
-        &self,
-        _query: CommandGetTableTypes,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_table_types");
-        let data = FlightSqlServiceImpl::table_types()
-            .map_err(|e| Status::internal(format!("Error getting table types: 
{e}")))?;
-        let resp = self.batch_to_schema_resp(&data, 
"get_flight_info_table_types")?;
-        Ok(resp)
-    }
-
-    async fn get_flight_info_sql_info(
-        &self,
-        _query: CommandGetSqlInfo,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_sql_info");
-        // TODO: implement for FlightSQL JDBC to work
-        Err(Status::unimplemented("Implement CommandGetSqlInfo"))
-    }
-    async fn get_flight_info_primary_keys(
-        &self,
-        _query: CommandGetPrimaryKeys,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_primary_keys");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_primary_keys",
-        ))
-    }
-    async fn get_flight_info_exported_keys(
-        &self,
-        _query: CommandGetExportedKeys,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_exported_keys");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_exported_keys",
-        ))
-    }
-    async fn get_flight_info_imported_keys(
-        &self,
-        _query: CommandGetImportedKeys,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_imported_keys");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_imported_keys",
-        ))
-    }
-    async fn get_flight_info_cross_reference(
-        &self,
-        _query: CommandGetCrossReference,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_cross_reference");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_cross_reference",
-        ))
-    }
-
-    /// Get a FlightInfo to extract information about the supported XDBC types.
-    async fn get_flight_info_xdbc_type_info(
-        &self,
-        _query: CommandGetXdbcTypeInfo,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_xdbc_type_info");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_xdbc_type_info",
-        ))
-    }
-
-    async fn do_get_statement(
-        &self,
-        _ticket: TicketStatementQuery,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_statement");
-        // let handle = Uuid::from_slice(&ticket.statement_handle)
-        //     .map_err(|e| Status::internal(format!("Error decoding ticket: 
{}", e)))?;
-        // let statements = self.statements.try_lock()
-        //     .map_err(|e| Status::internal(format!("Error decoding ticket: 
{}", e)))?;
-        // let plan = statements.get(&handle);
-        Err(Status::unimplemented("Implement do_get_statement"))
-    }
-
-    async fn do_get_prepared_statement(
-        &self,
-        _query: CommandPreparedStatementQuery,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_prepared_statement");
-        Err(Status::unimplemented("Implement do_get_prepared_statement"))
-    }
-    async fn do_get_catalogs(
-        &self,
-        _query: CommandGetCatalogs,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_catalogs");
-        Err(Status::unimplemented("Implement do_get_catalogs"))
-    }
-    async fn do_get_schemas(
-        &self,
-        _query: CommandGetDbSchemas,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_schemas");
-        Err(Status::unimplemented("Implement do_get_schemas"))
-    }
-    async fn do_get_tables(
-        &self,
-        _query: CommandGetTables,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_tables");
-        Err(Status::unimplemented("Implement do_get_tables"))
-    }
-    async fn do_get_table_types(
-        &self,
-        _query: CommandGetTableTypes,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_table_types");
-        Err(Status::unimplemented("Implement do_get_table_types"))
-    }
-    async fn do_get_sql_info(
-        &self,
-        _query: CommandGetSqlInfo,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_sql_info");
-        Err(Status::unimplemented("Implement do_get_sql_info"))
-    }
-    async fn do_get_primary_keys(
-        &self,
-        _query: CommandGetPrimaryKeys,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_primary_keys");
-        Err(Status::unimplemented("Implement do_get_primary_keys"))
-    }
-    async fn do_get_exported_keys(
-        &self,
-        _query: CommandGetExportedKeys,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_exported_keys");
-        Err(Status::unimplemented("Implement do_get_exported_keys"))
-    }
-    async fn do_get_imported_keys(
-        &self,
-        _query: CommandGetImportedKeys,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_imported_keys");
-        Err(Status::unimplemented("Implement do_get_imported_keys"))
-    }
-    async fn do_get_cross_reference(
-        &self,
-        _query: CommandGetCrossReference,
-        _request: Request<Ticket>,
-    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
-        debug!("do_get_cross_reference");
-        Err(Status::unimplemented("Implement do_get_cross_reference"))
-    }
-    // do_put
-    async fn do_put_statement_update(
-        &self,
-        _ticket: CommandStatementUpdate,
-        _request: Request<PeekableFlightDataStream>,
-    ) -> Result<i64, Status> {
-        debug!("do_put_statement_update");
-        Err(Status::unimplemented("Implement do_put_statement_update"))
-    }
-    async fn do_put_prepared_statement_query(
-        &self,
-        _query: CommandPreparedStatementQuery,
-        _request: Request<PeekableFlightDataStream>,
-    ) -> Result<DoPutPreparedStatementResult, Status> {
-        debug!("do_put_prepared_statement_query");
-        Err(Status::unimplemented(
-            "Implement do_put_prepared_statement_query",
-        ))
-    }
-    async fn do_put_prepared_statement_update(
-        &self,
-        handle: CommandPreparedStatementUpdate,
-        request: Request<PeekableFlightDataStream>,
-    ) -> Result<i64, Status> {
-        debug!("do_put_prepared_statement_update");
-        let ctx = self.get_ctx(&request)?;
-        let handle = 
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
-            .map_err(|e| Status::internal(format!("Error decoding handle: 
{e}")))?;
-        let plan = self.get_plan(&handle)?;
-        let _ = self.execute_plan(ctx, &plan).await?;
-        debug!("Sending -1 rows affected");
-        Ok(-1)
-    }
-
-    async fn do_action_create_prepared_statement(
-        &self,
-        query: ActionCreatePreparedStatementRequest,
-        request: Request<Action>,
-    ) -> Result<ActionCreatePreparedStatementResult, Status> {
-        debug!("do_action_create_prepared_statement");
-        let ctx = self.get_ctx(&request)?;
-        let plan = Self::prepare_statement(&query.query, &ctx).await?;
-        let schema_bytes = self.df_schema_to_arrow(plan.schema())?;
-        let handle = self.cache_plan(plan)?;
-        debug!("Prepared statement {}:\n{}", handle, query.query);
-        let res = ActionCreatePreparedStatementResult {
-            prepared_statement_handle: handle.as_bytes().to_vec().into(),
-            dataset_schema: schema_bytes.into(),
-            parameter_schema: Vec::new().into(), // TODO: parameters
-        };
-        Ok(res)
-    }
-
-    async fn do_action_close_prepared_statement(
-        &self,
-        handle: ActionClosePreparedStatementRequest,
-        _request: Request<Action>,
-    ) -> Result<(), Status> {
-        debug!("do_action_close_prepared_statement");
-        let handle = 
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
-            .inspect(|id| {
-                debug!("Closing {}", id);
-            })
-            .map_err(|e| Status::internal(format!("Failed to parse handle: 
{e:?}")))?;
-
-        self.remove_plan(handle)
-    }
-
-    /// Get a FlightInfo for executing a substrait plan.
-    async fn get_flight_info_substrait_plan(
-        &self,
-        _query: CommandStatementSubstraitPlan,
-        _request: Request<FlightDescriptor>,
-    ) -> Result<Response<FlightInfo>, Status> {
-        debug!("get_flight_info_substrait_plan");
-        Err(Status::unimplemented(
-            "Implement get_flight_info_substrait_plan",
-        ))
-    }
-
-    /// Execute a substrait plan
-    async fn do_put_substrait_plan(
-        &self,
-        _query: CommandStatementSubstraitPlan,
-        _request: Request<PeekableFlightDataStream>,
-    ) -> Result<i64, Status> {
-        debug!("do_put_substrait_plan");
-        Err(Status::unimplemented("Implement do_put_substrait_plan"))
-    }
-
-    /// Create a prepared substrait plan.
-    async fn do_action_create_prepared_substrait_plan(
-        &self,
-        _query: ActionCreatePreparedSubstraitPlanRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionCreatePreparedStatementResult, Status> {
-        debug!("do_action_create_prepared_substrait_plan");
-        Err(Status::unimplemented(
-            "Implement do_action_create_prepared_substrait_plan",
-        ))
-    }
-
-    /// Begin a transaction
-    async fn do_action_begin_transaction(
-        &self,
-        _query: ActionBeginTransactionRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionBeginTransactionResult, Status> {
-        debug!("do_action_begin_transaction");
-        Err(Status::unimplemented(
-            "Implement do_action_begin_transaction",
-        ))
-    }
-
-    /// End a transaction
-    async fn do_action_end_transaction(
-        &self,
-        _query: ActionEndTransactionRequest,
-        _request: Request<Action>,
-    ) -> Result<(), Status> {
-        debug!("do_action_end_transaction");
-        Err(Status::unimplemented("Implement do_action_end_transaction"))
-    }
-
-    /// Begin a savepoint
-    async fn do_action_begin_savepoint(
-        &self,
-        _query: ActionBeginSavepointRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionBeginSavepointResult, Status> {
-        debug!("do_action_begin_savepoint");
-        Err(Status::unimplemented("Implement do_action_begin_savepoint"))
-    }
-
-    /// End a savepoint
-    async fn do_action_end_savepoint(
-        &self,
-        _query: ActionEndSavepointRequest,
-        _request: Request<Action>,
-    ) -> Result<(), Status> {
-        debug!("do_action_end_savepoint");
-        Err(Status::unimplemented("Implement do_action_end_savepoint"))
-    }
-
-    /// Cancel a query
-    async fn do_action_cancel_query(
-        &self,
-        _query: ActionCancelQueryRequest,
-        _request: Request<Action>,
-    ) -> Result<ActionCancelQueryResult, Status> {
-        debug!("do_action_cancel_query");
-        Err(Status::unimplemented("Implement do_action_cancel_query"))
-    }
-
-    /// Register a new SqlInfo result, making it available when calling 
GetSqlInfo.
-    async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
-}
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index d709b6ec..fb29d5ba 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -28,8 +28,6 @@ pub mod scheduler_server;
 pub mod standalone;
 pub mod state;
 
-#[cfg(feature = "flight-sql")]
-pub mod flight_sql;
 #[cfg(test)]
 pub mod test_utils;
 
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index c6cbc665..1bd6179e 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#[cfg(feature = "flight-sql")]
-use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
 use ballista_core::serde::{
@@ -33,8 +31,7 @@ use std::{net::SocketAddr, sync::Arc};
 use crate::api::get_routes;
 use crate::cluster::BallistaCluster;
 use crate::config::SchedulerConfig;
-#[cfg(feature = "flight-sql")]
-use crate::flight_sql::FlightSqlServiceImpl;
+
 use crate::metrics::default_metrics_collector;
 #[cfg(feature = "keda-scaler")]
 use 
crate::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
@@ -91,17 +88,6 @@ pub async fn start_server(
     let tonic_builder =
         
tonic_builder.add_service(ExternalScalerServer::new(scheduler_server.clone()));
 
-    #[cfg(feature = "flight-sql")]
-    let tonic_builder = tonic_builder.add_service(
-        
FlightServiceServer::new(FlightSqlServiceImpl::new(scheduler_server.clone()))
-            .max_encoding_message_size(
-                config.grpc_server_max_encoding_message_size as usize,
-            )
-            .max_decoding_message_size(
-                config.grpc_server_max_decoding_message_size as usize,
-            ),
-    );
-
     let tonic = tonic_builder.into_service().into_axum_router();
     let tonic = tonic.fallback(|| async { (StatusCode::NOT_FOUND, "404 - Not 
Found") });
 
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 287b5fe0..5e909e15 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -107,4 +107,3 @@ round-robin-local
 | executor-slots-policy                        | Utf8   | bias        | Sets 
the executor slots policy for the scheduler, possible values: bias, 
round-robin, round-robin-local. For a cluster with single scheduler, 
round-robin-local is recommended. |
 | finished-job-data-clean-up-interval-seconds  | UInt64 | 300         | Sets 
the delayed interval for cleaning up finished job data, mainly the shuffle 
data, 0 means the cleaning up is disabled.                                      
                |
 | finished-job-state-clean-up-interval-seconds | UInt64 | 3600        | Sets 
the delayed interval for cleaning up finished job state stored in the backend, 
0 means the cleaning up is disabled.                                            
            |
-| advertise-flight-sql-endpoint                | Utf8   | N/A         | Sets 
the route endpoint for proxying flight sql results via scheduler.               
                                                                                
           |
diff --git a/docs/source/user-guide/flightsql.md 
b/docs/source/user-guide/flightsql.md
deleted file mode 100644
index b6bb71f4..00000000
--- a/docs/source/user-guide/flightsql.md
+++ /dev/null
@@ -1,103 +0,0 @@
-<!---
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-
-# Using FlightSQL to Connect to Ballista
-
-One of the easiest ways to start with Ballista is to plug it into your 
existing data infrastructure using support for Arrow Flight SQL JDBC.
-
-> This is optional scheduler feature which should be enabled with `flight-sql` 
feature
-
-Getting started involves these main steps:
-
-1. [Installing prerequisites](#prereq)
-2. Run the [Ballista docker container](#docker)
-3. Download the [Arrow Flight SQL JDBC Driver](#jdbc)
-4. [Install the driver](#tool) into your favorite JDBC tool
-5. Run a ["hello, world!"](#hello) query
-6. Register a table and run more complicated queries
-
-## <a name="prereq"/>Prerequisites
-
-### Ubuntu
-
-```shell
-sudo apt-get update
-sudo apt-get install -y docker.io
-```
-
-### MacOS
-
-```shell
-brew install docker
-```
-
-### Windows
-
-```shell
-choco install docker-desktop
-```
-
-## <a name="docker"/> Run Docker Container
-
-```shell
-docker run -p 50050:50050 --rm 
ghcr.io/apache/datafusion-ballista-standalone:0.10.0
-```
-
-## <a name="jdbc"/>Download the FlightSQL JDBC Driver
-
-Download the [FlightSQL JDBC 
Driver](https://repo1.maven.org/maven2/org/apache/arrow/flight-sql-jdbc-driver/10.0.0/flight-sql-jdbc-driver-10.0.0.jar)
-from Maven Central.
-
-## <a name="tool"/>Use the Driver in your Favorite Data Tool
-
-The important pieces of information:
-
-| Key              | Value                                              |
-| ---------------- | -------------------------------------------------- |
-| Driver file      | flight-sql-jdbc-driver-10.0.0-SNAPSHOT.jar         |
-| Class Name       | org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver |
-| Authentication   | User & Password                                    |
-| Username         | admin                                              |
-| Password         | password                                           |
-| Advanced Options | useEncryption=false                                |
-| URL              | jdbc:arrow-flight://127.0.0.1:50050                |
-
-## <a name="hello"/>Run a "Hello, World!" Query
-
-```sql
-select 'Hello from DataFusion Ballista!' as greeting;
-```
-
-## <a name="complex"/>Run a Complex Query
-
-In order to run queries against data, tables need to be "registered" with the 
current session (and re-registered upon each new connection).
-
-To register the built-in demo table, use the syntax below:
-
-```sql
-create external table taxi stored as parquet location 
'/data/yellow_tripdata_2022-01.parquet';
-```
-
-Once the table has been registered, all the normal SQL queries can be 
performed:
-
-```sql
-select * from taxi limit 10;
-```
-
-🎉 Happy querying! 🎉


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to