This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 559bcf29 feat: remove flight-sql (#1228)
559bcf29 is described below
commit 559bcf29ed0719d4fb133bba4d39e48c18f45891
Author: Marko Milenković <[email protected]>
AuthorDate: Fri Apr 18 17:08:31 2025 +0100
feat: remove flight-sql (#1228)
---
Cargo.lock | 1 -
ballista/scheduler/Cargo.toml | 2 -
ballista/scheduler/src/flight_sql.rs | 1014 ---------------------------
ballista/scheduler/src/lib.rs | 2 -
ballista/scheduler/src/scheduler_process.rs | 16 +-
docs/source/user-guide/configs.md | 1 -
docs/source/user-guide/flightsql.md | 103 ---
7 files changed, 1 insertion(+), 1138 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 327ddb83..8c872ccf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1056,7 +1056,6 @@ dependencies = [
"async-trait",
"axum",
"ballista-core",
- "base64 0.22.1",
"clap 4.5.36",
"configure_me",
"configure_me_codegen",
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 88d605f5..c0078a67 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -37,7 +37,6 @@ required-features = ["build-binary"]
[features]
build-binary = ["configure_me", "clap", "tracing-subscriber",
"tracing-appender", "tracing", "ballista-core/build-binary"]
default = ["build-binary"]
-flight-sql = ["base64"]
graphviz-support = ["dep:graphviz-rust"]
keda-scaler = []
prometheus-metrics = ["prometheus", "once_cell"]
@@ -48,7 +47,6 @@ arrow-flight = { workspace = true }
async-trait = { workspace = true }
axum = "0.7.7"
ballista-core = { path = "../core", version = "46.0.0" }
-base64 = { version = "0.22", optional = true }
clap = { workspace = true, optional = true }
configure_me = { workspace = true, optional = true }
dashmap = { workspace = true }
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
deleted file mode 100644
index 8dfd91f6..00000000
--- a/ballista/scheduler/src/flight_sql.rs
+++ /dev/null
@@ -1,1014 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow_flight::flight_descriptor::DescriptorType;
-use arrow_flight::flight_service_server::FlightService;
-use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
-use arrow_flight::sql::{
- ActionBeginSavepointRequest, ActionBeginSavepointResult,
- ActionBeginTransactionRequest, ActionBeginTransactionResult,
- ActionCancelQueryRequest, ActionCancelQueryResult,
- ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
- ActionCreatePreparedStatementResult,
ActionCreatePreparedSubstraitPlanRequest,
- ActionEndSavepointRequest, ActionEndTransactionRequest, CommandGetCatalogs,
- CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
- CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
- CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
- CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery,
- CommandStatementSubstraitPlan, CommandStatementUpdate,
DoPutPreparedStatementResult,
- SqlInfo, TicketStatementQuery,
-};
-use arrow_flight::{
- Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest,
- HandshakeResponse, Ticket,
-};
-use base64::Engine;
-use futures::Stream;
-use log::{debug, error, warn};
-use std::convert::TryFrom;
-use std::pin::Pin;
-use std::str::FromStr;
-use std::string::ToString;
-use std::sync::Arc;
-use std::time::Duration;
-use tonic::{Request, Response, Status, Streaming};
-
-use crate::scheduler_server::SchedulerServer;
-use arrow_flight::flight_service_client::FlightServiceClient;
-use arrow_flight::sql::ProstMessageExt;
-use arrow_flight::utils::batches_to_flight_data;
-use arrow_flight::SchemaAsIpc;
-use ballista_core::serde::protobuf;
-use ballista_core::serde::protobuf::action::ActionType::FetchPartition;
-use ballista_core::serde::protobuf::job_status;
-use ballista_core::serde::protobuf::JobStatus;
-use ballista_core::serde::protobuf::SuccessfulJob;
-use ballista_core::utils::create_grpc_client_connection;
-use dashmap::DashMap;
-use datafusion::arrow;
-use datafusion::arrow::array::{ArrayRef, StringArray};
-use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::arrow::error::ArrowError;
-use datafusion::arrow::ipc::writer::{
- DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
-};
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::common::DFSchemaRef;
-use datafusion::logical_expr::LogicalPlan;
-use datafusion::prelude::SessionContext;
-use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
-use prost::bytes::Bytes;
-use prost::Message;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-use tokio::time::sleep;
-use tokio_stream::wrappers::ReceiverStream;
-use tonic::metadata::MetadataValue;
-use uuid::Uuid;
-
-pub struct FlightSqlServiceImpl {
- server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
- statements: Arc<DashMap<Uuid, LogicalPlan>>,
- contexts: Arc<DashMap<Uuid, Arc<SessionContext>>>,
-}
-
-const TABLE_TYPES: [&str; 2] = ["TABLE", "VIEW"];
-
-impl FlightSqlServiceImpl {
- pub fn new(server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>) ->
Self {
- Self {
- server,
- statements: Default::default(),
- contexts: Default::default(),
- }
- }
-
- fn tables(&self, ctx: Arc<SessionContext>) -> Result<RecordBatch,
ArrowError> {
- let schema = Arc::new(Schema::new(vec![
- Field::new("catalog_name", DataType::Utf8, true),
- Field::new("db_schema_name", DataType::Utf8, true),
- Field::new("table_name", DataType::Utf8, false),
- Field::new("table_type", DataType::Utf8, false),
- ]));
- let mut names: Vec<Option<String>> = vec![];
- for catalog_name in ctx.catalog_names() {
- let catalog = ctx
- .catalog(&catalog_name)
- .expect("catalog should have been found");
- for schema_name in catalog.schema_names() {
- let schema = catalog
- .schema(&schema_name)
- .expect("schema should have been found");
- for table_name in schema.table_names() {
- names.push(Some(table_name));
- }
- }
- }
- let types: Vec<_> = names.iter().map(|_|
Some("TABLE".to_string())).collect();
- let cats: Vec<_> = names.iter().map(|_| None).collect();
- let schemas: Vec<_> = names.iter().map(|_| None).collect();
- let rb = RecordBatch::try_new(
- schema,
- [cats, schemas, names, types]
- .iter()
- .map(|i| Arc::new(StringArray::from(i.clone())) as ArrayRef)
- .collect::<Vec<_>>(),
- )?;
- Ok(rb)
- }
-
- fn table_types() -> Result<RecordBatch, ArrowError> {
- let schema = Arc::new(Schema::new(vec![Field::new(
- "table_type",
- DataType::Utf8,
- false,
- )]));
- RecordBatch::try_new(
- schema,
- [TABLE_TYPES]
- .iter()
- .map(|i| Arc::new(StringArray::from(i.to_vec())) as ArrayRef)
- .collect::<Vec<_>>(),
- )
- }
-
- async fn create_ctx(&self) -> Result<Uuid, Status> {
- let config = self.server.state.session_manager.produce_config();
- let ctx = self
- .server
- .state
- .session_manager
- .create_session(&config)
- .await
- .map_err(|e| {
- Status::internal(format!("Failed to create SessionContext:
{e:?}"))
- })?;
- let handle = Uuid::new_v4();
- self.contexts.insert(handle, ctx);
- Ok(handle)
- }
-
- fn get_ctx<T>(&self, req: &Request<T>) -> Result<Arc<SessionContext>,
Status> {
- let auth = req
- .metadata()
- .get("authorization")
- .ok_or_else(|| Status::internal("No authorization header!"))?;
- let str = auth
- .to_str()
- .map_err(|e| Status::internal(format!("Error parsing header:
{e}")))?;
- let authorization = str.to_string();
- let bearer = "Bearer ";
- if !authorization.starts_with(bearer) {
- Err(Status::internal("Invalid auth header!"))?;
- }
- let auth = authorization[bearer.len()..].to_string();
-
- let handle = Uuid::from_str(auth.as_str())
- .map_err(|e| Status::internal(format!("Error locking contexts:
{e}")))?;
- if let Some(context) = self.contexts.get(&handle) {
- Ok(context.clone())
- } else {
- Err(Status::internal(format!(
- "Context handle not found: {handle}"
- )))?
- }
- }
-
- async fn prepare_statement(
- query: &str,
- ctx: &Arc<SessionContext>,
- ) -> Result<LogicalPlan, Status> {
- let plan = ctx
- .sql(query)
- .await
- .and_then(|df| df.into_optimized_plan())
- .map_err(|e| Status::internal(format!("Error building plan:
{e}")))?;
- Ok(plan)
- }
-
- async fn check_job(&self, job_id: &String) ->
Result<Option<SuccessfulJob>, Status> {
- let status = self
- .server
- .state
- .task_manager
- .get_job_status(job_id)
- .await
- .map_err(|e| {
- let msg = format!("Error getting status for job {job_id}:
{e:?}");
- error!("{}", msg);
- Status::internal(msg)
- })?;
- let status: JobStatus = match status {
- Some(status) => status,
- None => {
- let msg = format!("Error getting status for job {job_id}!");
- error!("{}", msg);
- Err(Status::internal(msg))?
- }
- };
- let status: job_status::Status = match status.status {
- Some(status) => status,
- None => {
- let msg = format!("Error getting status for job {job_id}!");
- error!("{}", msg);
- Err(Status::internal(msg))?
- }
- };
- match status {
- job_status::Status::Queued(_) => Ok(None),
- job_status::Status::Running(_) => Ok(None),
- job_status::Status::Failed(e) => {
- warn!("Error executing plan: {:?}", e);
- Err(Status::internal(format!(
- "Error executing plan: {}",
- e.error
- )))?
- }
- job_status::Status::Successful(comp) => Ok(Some(comp)),
- }
- }
-
- async fn job_to_fetch_part(
- &self,
- completed: SuccessfulJob,
- num_rows: &mut i64,
- num_bytes: &mut i64,
- ) -> Result<Vec<FlightEndpoint>, Status> {
- let mut fieps: Vec<_> = vec![];
- for loc in completed.partition_location.iter() {
- let (exec_host, exec_port) = if let Some(ref md) =
loc.executor_meta {
- (md.host.clone(), md.port)
- } else {
- Err(Status::internal(
- "Invalid partition location, missing executor metadata and
advertise_endpoint flag is undefined.".to_string(),
- ))?
- };
-
- let (host, port) = match &self
- .server
- .state
- .config
- .advertise_flight_sql_endpoint
- {
- Some(endpoint) => {
- let advertise_endpoint_vec: Vec<&str> =
endpoint.split(':').collect();
- match advertise_endpoint_vec.as_slice() {
- [host_ip, port] => {
- (String::from(*host_ip),
FromStr::from_str(port).expect("Failed to parse port from advertise-endpoint."))
- }
- _ => {
- Err(Status::internal("advertise-endpoint flag has
incorrect format. Expected IP:Port".to_string()))?
- }
- }
- }
- None => (exec_host.clone(), exec_port),
- };
-
- let fetch = if let Some(ref id) = loc.partition_id {
- let fetch = protobuf::FetchPartition {
- job_id: id.job_id.clone(),
- stage_id: id.stage_id,
- partition_id: id.partition_id,
- path: loc.path.clone(),
- // Use executor ip:port for routing to flight result
- host: exec_host.clone(),
- port: exec_port,
- };
- protobuf::Action {
- action_type: Some(FetchPartition(fetch)),
- settings: vec![],
- }
- } else {
- Err(Status::internal("Error getting partition
ID".to_string()))?
- };
- if let Some(ref stats) = loc.partition_stats {
- *num_rows += stats.num_rows;
- *num_bytes += stats.num_bytes;
- } else {
- Err(Status::internal("Error getting stats".to_string()))?
- }
- let authority = format!("{}:{}", &host, &port);
- let buf = fetch.as_any().encode_to_vec();
- let ticket = Ticket { ticket: buf.into() };
- let fiep = FlightEndpoint::new()
- .with_ticket(ticket)
- .with_location(format!("grpc+tcp://{authority}"));
- fieps.push(fiep);
- }
- Ok(fieps)
- }
-
- fn make_local_fieps(&self, job_id: &str) -> Result<Vec<FlightEndpoint>,
Status> {
- let (host, port) = ("127.0.0.1".to_string(), 50050); // TODO: use
advertise host
- let fetch = protobuf::FetchPartition {
- job_id: job_id.to_string(),
- stage_id: 0,
- partition_id: 0,
- path: job_id.to_string(),
- host: host.clone(),
- port,
- };
- let fetch = protobuf::Action {
- action_type: Some(FetchPartition(fetch)),
- settings: vec![],
- };
- let authority = format!("{}:{}", &host, &port); // TODO: use advertise
host
- let buf = fetch.as_any().encode_to_vec();
- let ticket = Ticket { ticket: buf.into() };
- let fiep = FlightEndpoint::new()
- .with_ticket(ticket)
- .with_location(format!("grpc+tcp://{authority}"));
- let fieps = vec![fiep];
- Ok(fieps)
- }
-
- fn cache_plan(&self, plan: LogicalPlan) -> Result<Uuid, Status> {
- let handle = Uuid::new_v4();
- self.statements.insert(handle, plan);
- Ok(handle)
- }
-
- fn get_plan(&self, handle: &Uuid) -> Result<LogicalPlan, Status> {
- if let Some(plan) = self.statements.get(handle) {
- Ok(plan.clone())
- } else {
- Err(Status::internal(format!(
- "Statement handle not found: {handle}"
- )))?
- }
- }
-
- fn remove_plan(&self, handle: Uuid) -> Result<(), Status> {
- self.statements.remove(&handle);
- Ok(())
- }
-
- fn df_schema_to_arrow(&self, schema: &DFSchemaRef) -> Result<Vec<u8>,
Status> {
- let arrow_schema: Schema = (&**schema).into();
- let schema_bytes = self.schema_to_arrow(Arc::new(arrow_schema))?;
- Ok(schema_bytes)
- }
-
- fn schema_to_arrow(&self, arrow_schema: SchemaRef) -> Result<Vec<u8>,
Status> {
- let options = IpcWriteOptions::default();
- let pair = SchemaAsIpc::new(&arrow_schema, &options);
- let data_gen = IpcDataGenerator::default();
- let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(
- false,
- pair.1.preserve_dict_id(),
- );
- let encoded_data = data_gen.schema_to_bytes_with_dictionary_tracker(
- pair.0,
- &mut dictionary_tracker,
- pair.1,
- );
- let mut schema_bytes = vec![];
- arrow::ipc::writer::write_message(&mut schema_bytes, encoded_data,
pair.1)
- .map_err(|e| Status::internal(format!("Error encoding schema:
{e}")))?;
- Ok(schema_bytes)
- }
-
- async fn enqueue_job(
- &self,
- ctx: Arc<SessionContext>,
- plan: &LogicalPlan,
- ) -> Result<String, Status> {
- let job_id = self.server.state.task_manager.generate_job_id();
- let job_name = format!("Flight SQL job {job_id}");
- self.server
- .submit_job(&job_id, &job_name, ctx, plan)
- .await
- .map_err(|e| {
- let msg = format!("Failed to send JobQueued event for
{job_id}: {e:?}");
- error!("{}", msg);
- Status::internal(msg)
- })?;
- Ok(job_id)
- }
-
- fn create_resp(
- schema_bytes: Vec<u8>,
- fieps: Vec<FlightEndpoint>,
- num_rows: i64,
- num_bytes: i64,
- ) -> Response<FlightInfo> {
- let flight_desc = FlightDescriptor {
- r#type: DescriptorType::Cmd.into(),
- cmd: Vec::new().into(),
- path: vec![],
- };
- let info = FlightInfo {
- schema: schema_bytes.into(),
- flight_descriptor: Some(flight_desc),
- endpoint: fieps,
- total_records: num_rows,
- total_bytes: num_bytes,
- ordered: false,
- app_metadata: Bytes::new(),
- };
- Response::new(info)
- }
-
- async fn execute_plan(
- &self,
- ctx: Arc<SessionContext>,
- plan: &LogicalPlan,
- ) -> Result<Response<FlightInfo>, Status> {
- let job_id = self.enqueue_job(ctx, plan).await?;
-
- // poll for job completion
- let mut num_rows = 0;
- let mut num_bytes = 0;
- let fieps = loop {
- sleep(Duration::from_millis(100)).await;
- let completed = if let Some(comp) = self.check_job(&job_id).await?
{
- comp
- } else {
- continue;
- };
- let fieps = self
- .job_to_fetch_part(completed, &mut num_rows, &mut num_bytes)
- .await?;
- break fieps;
- };
-
- // Generate response
- let schema_bytes = self.df_schema_to_arrow(plan.schema())?;
- let resp = Self::create_resp(schema_bytes, fieps, num_rows, num_bytes);
- Ok(resp)
- }
-
- async fn record_batch_to_resp(
- rb: RecordBatch,
- ) -> Result<
- Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> +
Send>>>,
- Status,
- > {
- type FlightResult = Result<FlightData, Status>;
- let (tx, rx): (Sender<FlightResult>, Receiver<FlightResult>) =
channel(2);
- let schema = rb.schema();
- let flights = batches_to_flight_data(&schema, vec![rb])
- .map_err(|_| Status::internal("Error encoding
batches".to_string()))?;
- for flight in flights {
- tx.send(Ok(flight))
- .await
- .map_err(|_| Status::internal("Error sending
flight".to_string()))?;
- }
- let resp = Response::new(Box::pin(ReceiverStream::new(rx))
- as Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send +
'static>>);
- Ok(resp)
- }
-
- fn batch_to_schema_resp(
- &self,
- data: &RecordBatch,
- name: &str,
- ) -> Result<Response<FlightInfo>, Status> {
- let num_bytes = data.get_array_memory_size() as i64;
- let schema = data.schema();
- let num_rows = data.num_rows() as i64;
-
- let fieps = self.make_local_fieps(name)?;
- let schema_bytes = self.schema_to_arrow(schema)?;
- let resp = Self::create_resp(schema_bytes, fieps, num_rows, num_bytes);
- Ok(resp)
- }
-}
-
-#[tonic::async_trait]
-impl FlightSqlService for FlightSqlServiceImpl {
- type FlightService = FlightSqlServiceImpl;
-
- async fn do_handshake(
- &self,
- request: Request<Streaming<HandshakeRequest>>,
- ) -> Result<
- Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>>
+ Send>>>,
- Status,
- > {
- debug!("do_handshake");
- for md in request.metadata().iter() {
- debug!("{:?}", md);
- }
-
- let basic = "Basic ";
- let authorization = request
- .metadata()
- .get("authorization")
- .ok_or_else(|| Status::invalid_argument("authorization field not
present"))?
- .to_str()
- .map_err(|_| Status::invalid_argument("authorization not
parsable"))?;
- if !authorization.starts_with(basic) {
- Err(Status::invalid_argument(format!(
- "Auth type not implemented: {authorization}"
- )))?;
- }
- let bytes = base64::engine::general_purpose::STANDARD
- .decode(&authorization[basic.len()..])
- .map_err(|_| Status::invalid_argument("authorization not
parsable"))?;
- let str = String::from_utf8(bytes)
- .map_err(|_| Status::invalid_argument("authorization not
parsable"))?;
- let parts: Vec<_> = str.split(':').collect();
- if parts.len() != 2 {
- Err(Status::invalid_argument("Invalid authorization header"))?;
- }
- let user = parts[0];
- let pass = parts[1];
- if user != "admin" || pass != "password" {
- Err(Status::unauthenticated("Invalid credentials!"))?
- }
-
- let token = self.create_ctx().await?;
-
- let result = HandshakeResponse {
- protocol_version: 0,
- payload: token.as_bytes().to_vec().into(),
- };
- let result = Ok(result);
- let output = futures::stream::iter(vec![result]);
- let str = format!("Bearer {token}");
- let mut resp: Response<Pin<Box<dyn Stream<Item = Result<_, _>> +
Send>>> =
- Response::new(Box::pin(output));
- let md = MetadataValue::try_from(str)
- .map_err(|_| Status::invalid_argument("authorization not
parsable"))?;
- resp.metadata_mut().insert("authorization", md);
- Ok(resp)
- }
-
- async fn do_get_fallback(
- &self,
- request: Request<Ticket>,
- message: arrow_flight::sql::Any,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_fallback type_url: {}", message.type_url);
- let ctx = self.get_ctx(&request)?;
- if !message.is::<protobuf::Action>() {
- Err(Status::unimplemented(format!(
- "do_get: The defined request is invalid: {}",
- message.type_url
- )))?
- }
-
- let action: protobuf::Action = message
- .unpack()
- .map_err(|e| Status::internal(format!("{e:?}")))?
- .ok_or_else(|| Status::internal("Expected an Action but got
None!"))?;
- let fp = match &action.action_type {
- Some(FetchPartition(fp)) => fp.clone(),
- None => Err(Status::internal("Expected an ActionType but got
None!"))?,
- };
-
- // Well-known job ID: respond with the data
- match fp.job_id.as_str() {
- "get_flight_info_table_types" => {
- debug!("Responding with table types");
- let rb = FlightSqlServiceImpl::table_types().map_err(|_| {
- Status::internal("Error getting table types".to_string())
- })?;
- let resp = Self::record_batch_to_resp(rb).await?;
- return Ok(resp);
- }
- "get_flight_info_tables" => {
- debug!("Responding with tables");
- let rb = self
- .tables(ctx)
- .map_err(|_| Status::internal("Error getting
tables".to_string()))?;
- let resp = Self::record_batch_to_resp(rb).await?;
- return Ok(resp);
- }
- _ => {}
- }
-
- // Proxy the flight
- let addr = format!("http://{}:{}", fp.host, fp.port);
- debug!("Scheduler proxying flight for to {}", addr);
- let connection =
- create_grpc_client_connection(addr.clone())
- .await
- .map_err(|e| {
- Status::internal(format!(
- "Error connecting to Ballista scheduler or executor at
{addr}: {e:?}"
- ))
- })?;
- let mut flight_client = FlightServiceClient::new(connection);
- let buf = action.encode_to_vec();
- let request = Request::new(Ticket { ticket: buf.into() });
-
- let stream = flight_client
- .do_get(request)
- .await
- .map_err(|e| Status::internal(format!("{e:?}")))?
- .into_inner();
- Ok(Response::new(Box::pin(stream)))
- }
-
- /// Get a FlightDataStream containing the data related to the supported
XDBC types.
- async fn do_get_xdbc_type_info(
- &self,
- _query: CommandGetXdbcTypeInfo,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_xdbc_type_info");
- Err(Status::unimplemented("Implement do_get_xdbc_type_info"))
- }
-
- async fn get_flight_info_statement(
- &self,
- query: CommandStatementQuery,
- request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_statement query:\n{}", query.query);
-
- let ctx = self.get_ctx(&request)?;
- let plan = Self::prepare_statement(&query.query, &ctx).await?;
- let resp = self.execute_plan(ctx, &plan).await?;
-
- debug!("Returning flight info...");
- Ok(resp)
- }
-
- async fn get_flight_info_prepared_statement(
- &self,
- handle: CommandPreparedStatementQuery,
- request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_prepared_statement");
- let ctx = self.get_ctx(&request)?;
- let handle =
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
- .map_err(|e| Status::internal(format!("Error decoding handle:
{e}")))?;
- let plan = self.get_plan(&handle)?;
- let resp = self.execute_plan(ctx, &plan).await?;
-
- debug!("Responding to query {}...", handle);
- Ok(resp)
- }
-
- async fn get_flight_info_catalogs(
- &self,
- _query: CommandGetCatalogs,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_catalogs");
- Err(Status::unimplemented("Implement get_flight_info_catalogs"))
- }
- async fn get_flight_info_schemas(
- &self,
- _query: CommandGetDbSchemas,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_schemas");
- Err(Status::unimplemented("Implement get_flight_info_schemas"))
- }
-
- async fn get_flight_info_tables(
- &self,
- _query: CommandGetTables,
- request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_tables");
- let ctx = self.get_ctx(&request)?;
- let data = self
- .tables(ctx)
- .map_err(|e| Status::internal(format!("Error getting tables:
{e}")))?;
- let resp = self.batch_to_schema_resp(&data, "get_flight_info_tables")?;
- Ok(resp)
- }
-
- async fn get_flight_info_table_types(
- &self,
- _query: CommandGetTableTypes,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_table_types");
- let data = FlightSqlServiceImpl::table_types()
- .map_err(|e| Status::internal(format!("Error getting table types:
{e}")))?;
- let resp = self.batch_to_schema_resp(&data,
"get_flight_info_table_types")?;
- Ok(resp)
- }
-
- async fn get_flight_info_sql_info(
- &self,
- _query: CommandGetSqlInfo,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_sql_info");
- // TODO: implement for FlightSQL JDBC to work
- Err(Status::unimplemented("Implement CommandGetSqlInfo"))
- }
- async fn get_flight_info_primary_keys(
- &self,
- _query: CommandGetPrimaryKeys,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_primary_keys");
- Err(Status::unimplemented(
- "Implement get_flight_info_primary_keys",
- ))
- }
- async fn get_flight_info_exported_keys(
- &self,
- _query: CommandGetExportedKeys,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_exported_keys");
- Err(Status::unimplemented(
- "Implement get_flight_info_exported_keys",
- ))
- }
- async fn get_flight_info_imported_keys(
- &self,
- _query: CommandGetImportedKeys,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_imported_keys");
- Err(Status::unimplemented(
- "Implement get_flight_info_imported_keys",
- ))
- }
- async fn get_flight_info_cross_reference(
- &self,
- _query: CommandGetCrossReference,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_cross_reference");
- Err(Status::unimplemented(
- "Implement get_flight_info_cross_reference",
- ))
- }
-
- /// Get a FlightInfo to extract information about the supported XDBC types.
- async fn get_flight_info_xdbc_type_info(
- &self,
- _query: CommandGetXdbcTypeInfo,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_xdbc_type_info");
- Err(Status::unimplemented(
- "Implement get_flight_info_xdbc_type_info",
- ))
- }
-
- async fn do_get_statement(
- &self,
- _ticket: TicketStatementQuery,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_statement");
- // let handle = Uuid::from_slice(&ticket.statement_handle)
- // .map_err(|e| Status::internal(format!("Error decoding ticket:
{}", e)))?;
- // let statements = self.statements.try_lock()
- // .map_err(|e| Status::internal(format!("Error decoding ticket:
{}", e)))?;
- // let plan = statements.get(&handle);
- Err(Status::unimplemented("Implement do_get_statement"))
- }
-
- async fn do_get_prepared_statement(
- &self,
- _query: CommandPreparedStatementQuery,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_prepared_statement");
- Err(Status::unimplemented("Implement do_get_prepared_statement"))
- }
- async fn do_get_catalogs(
- &self,
- _query: CommandGetCatalogs,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_catalogs");
- Err(Status::unimplemented("Implement do_get_catalogs"))
- }
- async fn do_get_schemas(
- &self,
- _query: CommandGetDbSchemas,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_schemas");
- Err(Status::unimplemented("Implement do_get_schemas"))
- }
- async fn do_get_tables(
- &self,
- _query: CommandGetTables,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_tables");
- Err(Status::unimplemented("Implement do_get_tables"))
- }
- async fn do_get_table_types(
- &self,
- _query: CommandGetTableTypes,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_table_types");
- Err(Status::unimplemented("Implement do_get_table_types"))
- }
- async fn do_get_sql_info(
- &self,
- _query: CommandGetSqlInfo,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_sql_info");
- Err(Status::unimplemented("Implement do_get_sql_info"))
- }
- async fn do_get_primary_keys(
- &self,
- _query: CommandGetPrimaryKeys,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_primary_keys");
- Err(Status::unimplemented("Implement do_get_primary_keys"))
- }
- async fn do_get_exported_keys(
- &self,
- _query: CommandGetExportedKeys,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_exported_keys");
- Err(Status::unimplemented("Implement do_get_exported_keys"))
- }
- async fn do_get_imported_keys(
- &self,
- _query: CommandGetImportedKeys,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_imported_keys");
- Err(Status::unimplemented("Implement do_get_imported_keys"))
- }
- async fn do_get_cross_reference(
- &self,
- _query: CommandGetCrossReference,
- _request: Request<Ticket>,
- ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- debug!("do_get_cross_reference");
- Err(Status::unimplemented("Implement do_get_cross_reference"))
- }
- // do_put
- async fn do_put_statement_update(
- &self,
- _ticket: CommandStatementUpdate,
- _request: Request<PeekableFlightDataStream>,
- ) -> Result<i64, Status> {
- debug!("do_put_statement_update");
- Err(Status::unimplemented("Implement do_put_statement_update"))
- }
- async fn do_put_prepared_statement_query(
- &self,
- _query: CommandPreparedStatementQuery,
- _request: Request<PeekableFlightDataStream>,
- ) -> Result<DoPutPreparedStatementResult, Status> {
- debug!("do_put_prepared_statement_query");
- Err(Status::unimplemented(
- "Implement do_put_prepared_statement_query",
- ))
- }
- async fn do_put_prepared_statement_update(
- &self,
- handle: CommandPreparedStatementUpdate,
- request: Request<PeekableFlightDataStream>,
- ) -> Result<i64, Status> {
- debug!("do_put_prepared_statement_update");
- let ctx = self.get_ctx(&request)?;
- let handle =
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
- .map_err(|e| Status::internal(format!("Error decoding handle:
{e}")))?;
- let plan = self.get_plan(&handle)?;
- let _ = self.execute_plan(ctx, &plan).await?;
- debug!("Sending -1 rows affected");
- Ok(-1)
- }
-
- async fn do_action_create_prepared_statement(
- &self,
- query: ActionCreatePreparedStatementRequest,
- request: Request<Action>,
- ) -> Result<ActionCreatePreparedStatementResult, Status> {
- debug!("do_action_create_prepared_statement");
- let ctx = self.get_ctx(&request)?;
- let plan = Self::prepare_statement(&query.query, &ctx).await?;
- let schema_bytes = self.df_schema_to_arrow(plan.schema())?;
- let handle = self.cache_plan(plan)?;
- debug!("Prepared statement {}:\n{}", handle, query.query);
- let res = ActionCreatePreparedStatementResult {
- prepared_statement_handle: handle.as_bytes().to_vec().into(),
- dataset_schema: schema_bytes.into(),
- parameter_schema: Vec::new().into(), // TODO: parameters
- };
- Ok(res)
- }
-
- async fn do_action_close_prepared_statement(
- &self,
- handle: ActionClosePreparedStatementRequest,
- _request: Request<Action>,
- ) -> Result<(), Status> {
- debug!("do_action_close_prepared_statement");
- let handle =
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
- .inspect(|id| {
- debug!("Closing {}", id);
- })
- .map_err(|e| Status::internal(format!("Failed to parse handle:
{e:?}")))?;
-
- self.remove_plan(handle)
- }
-
- /// Get a FlightInfo for executing a substrait plan.
- async fn get_flight_info_substrait_plan(
- &self,
- _query: CommandStatementSubstraitPlan,
- _request: Request<FlightDescriptor>,
- ) -> Result<Response<FlightInfo>, Status> {
- debug!("get_flight_info_substrait_plan");
- Err(Status::unimplemented(
- "Implement get_flight_info_substrait_plan",
- ))
- }
-
- /// Execute a substrait plan
- async fn do_put_substrait_plan(
- &self,
- _query: CommandStatementSubstraitPlan,
- _request: Request<PeekableFlightDataStream>,
- ) -> Result<i64, Status> {
- debug!("do_put_substrait_plan");
- Err(Status::unimplemented("Implement do_put_substrait_plan"))
- }
-
- /// Create a prepared substrait plan.
- async fn do_action_create_prepared_substrait_plan(
- &self,
- _query: ActionCreatePreparedSubstraitPlanRequest,
- _request: Request<Action>,
- ) -> Result<ActionCreatePreparedStatementResult, Status> {
- debug!("do_action_create_prepared_substrait_plan");
- Err(Status::unimplemented(
- "Implement do_action_create_prepared_substrait_plan",
- ))
- }
-
- /// Begin a transaction
- async fn do_action_begin_transaction(
- &self,
- _query: ActionBeginTransactionRequest,
- _request: Request<Action>,
- ) -> Result<ActionBeginTransactionResult, Status> {
- debug!("do_action_begin_transaction");
- Err(Status::unimplemented(
- "Implement do_action_begin_transaction",
- ))
- }
-
- /// End a transaction
- async fn do_action_end_transaction(
- &self,
- _query: ActionEndTransactionRequest,
- _request: Request<Action>,
- ) -> Result<(), Status> {
- debug!("do_action_end_transaction");
- Err(Status::unimplemented("Implement do_action_end_transaction"))
- }
-
- /// Begin a savepoint
- async fn do_action_begin_savepoint(
- &self,
- _query: ActionBeginSavepointRequest,
- _request: Request<Action>,
- ) -> Result<ActionBeginSavepointResult, Status> {
- debug!("do_action_begin_savepoint");
- Err(Status::unimplemented("Implement do_action_begin_savepoint"))
- }
-
- /// End a savepoint
- async fn do_action_end_savepoint(
- &self,
- _query: ActionEndSavepointRequest,
- _request: Request<Action>,
- ) -> Result<(), Status> {
- debug!("do_action_end_savepoint");
- Err(Status::unimplemented("Implement do_action_end_savepoint"))
- }
-
- /// Cancel a query
- async fn do_action_cancel_query(
- &self,
- _query: ActionCancelQueryRequest,
- _request: Request<Action>,
- ) -> Result<ActionCancelQueryResult, Status> {
- debug!("do_action_cancel_query");
- Err(Status::unimplemented("Implement do_action_cancel_query"))
- }
-
- /// Register a new SqlInfo result, making it available when calling
GetSqlInfo.
- async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
-}
diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs
index d709b6ec..fb29d5ba 100644
--- a/ballista/scheduler/src/lib.rs
+++ b/ballista/scheduler/src/lib.rs
@@ -28,8 +28,6 @@ pub mod scheduler_server;
pub mod standalone;
pub mod state;
-#[cfg(feature = "flight-sql")]
-pub mod flight_sql;
#[cfg(test)]
pub mod test_utils;
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index c6cbc665..1bd6179e 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#[cfg(feature = "flight-sql")]
-use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer;
use ballista_core::serde::{
@@ -33,8 +31,7 @@ use std::{net::SocketAddr, sync::Arc};
use crate::api::get_routes;
use crate::cluster::BallistaCluster;
use crate::config::SchedulerConfig;
-#[cfg(feature = "flight-sql")]
-use crate::flight_sql::FlightSqlServiceImpl;
+
use crate::metrics::default_metrics_collector;
#[cfg(feature = "keda-scaler")]
use
crate::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
@@ -91,17 +88,6 @@ pub async fn start_server(
let tonic_builder =
tonic_builder.add_service(ExternalScalerServer::new(scheduler_server.clone()));
- #[cfg(feature = "flight-sql")]
- let tonic_builder = tonic_builder.add_service(
-
FlightServiceServer::new(FlightSqlServiceImpl::new(scheduler_server.clone()))
- .max_encoding_message_size(
- config.grpc_server_max_encoding_message_size as usize,
- )
- .max_decoding_message_size(
- config.grpc_server_max_decoding_message_size as usize,
- ),
- );
-
let tonic = tonic_builder.into_service().into_axum_router();
let tonic = tonic.fallback(|| async { (StatusCode::NOT_FOUND, "404 - Not
Found") });
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 287b5fe0..5e909e15 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -107,4 +107,3 @@ round-robin-local
| executor-slots-policy | Utf8 | bias | Sets
the executor slots policy for the scheduler, possible values: bias,
round-robin, round-robin-local. For a cluster with single scheduler,
round-robin-local is recommended. |
| finished-job-data-clean-up-interval-seconds | UInt64 | 300 | Sets
the delayed interval for cleaning up finished job data, mainly the shuffle
data, 0 means the cleaning up is disabled.
|
| finished-job-state-clean-up-interval-seconds | UInt64 | 3600 | Sets
the delayed interval for cleaning up finished job state stored in the backend,
0 means the cleaning up is disabled.
|
-| advertise-flight-sql-endpoint | Utf8 | N/A | Sets
the route endpoint for proxying flight sql results via scheduler.
|
diff --git a/docs/source/user-guide/flightsql.md
b/docs/source/user-guide/flightsql.md
deleted file mode 100644
index b6bb71f4..00000000
--- a/docs/source/user-guide/flightsql.md
+++ /dev/null
@@ -1,103 +0,0 @@
-<!---
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-# Using FlightSQL to Connect to Ballista
-
-One of the easiest ways to start with Ballista is to plug it into your
existing data infrastructure using support for Arrow Flight SQL JDBC.
-
-> This is optional scheduler feature which should be enabled with `flight-sql`
feature
-
-Getting started involves these main steps:
-
-1. [Installing prerequisites](#prereq)
-2. Run the [Ballista docker container](#docker)
-3. Download the [Arrow Flight SQL JDBC Driver](#jdbc)
-4. [Install the driver](#tool) into your favorite JDBC tool
-5. Run a ["hello, world!"](#hello) query
-6. Register a table and run more complicated queries
-
-## <a name="prereq"/>Prerequisites
-
-### Ubuntu
-
-```shell
-sudo apt-get update
-sudo apt-get install -y docker.io
-```
-
-### MacOS
-
-```shell
-brew install docker
-```
-
-### Windows
-
-```shell
-choco install docker-desktop
-```
-
-## <a name="docker"/> Run Docker Container
-
-```shell
-docker run -p 50050:50050 --rm
ghcr.io/apache/datafusion-ballista-standalone:0.10.0
-```
-
-## <a name="jdbc"/>Download the FlightSQL JDBC Driver
-
-Download the [FlightSQL JDBC
Driver](https://repo1.maven.org/maven2/org/apache/arrow/flight-sql-jdbc-driver/10.0.0/flight-sql-jdbc-driver-10.0.0.jar)
-from Maven Central.
-
-## <a name="tool"/>Use the Driver in your Favorite Data Tool
-
-The important pieces of information:
-
-| Key | Value |
-| ---------------- | -------------------------------------------------- |
-| Driver file | flight-sql-jdbc-driver-10.0.0-SNAPSHOT.jar |
-| Class Name | org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver |
-| Authentication | User & Password |
-| Username | admin |
-| Password | password |
-| Advanced Options | useEncryption=false |
-| URL | jdbc:arrow-flight://127.0.0.1:50050 |
-
-## <a name="hello"/>Run a "Hello, World!" Query
-
-```sql
-select 'Hello from DataFusion Ballista!' as greeting;
-```
-
-## <a name="complex"/>Run a Complex Query
-
-In order to run queries against data, tables need to be "registered" with the
current session (and re-registered upon each new connection).
-
-To register the built-in demo table, use the syntax below:
-
-```sql
-create external table taxi stored as parquet location
'/data/yellow_tripdata_2022-01.parquet';
-```
-
-Once the table has been registered, all the normal SQL queries can be
performed:
-
-```sql
-select * from taxi limit 10;
-```
-
-🎉 Happy querying! 🎉
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]