This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 2e67faa4 Add FlightSQL support (#93)
2e67faa4 is described below
commit 2e67faa40a56ab369eb81ec1f7008f79953776e1
Author: Brent Gardner <[email protected]>
AuthorDate: Wed Jul 27 02:01:50 2022 -0600
Add FlightSQL support (#93)
---
ballista/rust/scheduler/Cargo.toml | 4 +-
ballista/rust/scheduler/src/flight_sql.rs | 572 ++++++++++++++++++++++++++++++
ballista/rust/scheduler/src/lib.rs | 1 +
ballista/rust/scheduler/src/main.rs | 7 +
4 files changed, 583 insertions(+), 1 deletion(-)
diff --git a/ballista/rust/scheduler/Cargo.toml
b/ballista/rust/scheduler/Cargo.toml
index 8ff40930..e0555a46 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -36,6 +36,7 @@ sled = ["sled_package", "tokio-stream"]
[dependencies]
anyhow = "1"
+arrow-flight = { version = "18.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
@@ -45,6 +46,7 @@ datafusion = { git =
"https://github.com/apache/arrow-datafusion", rev = "6a5de4
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
+flatbuffers = { version = "2.1.2" }
futures = "0.3"
http = "0.2"
http-body = "0.4"
@@ -61,11 +63,11 @@ tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.7"
tower = { version = "0.4" }
+uuid = { version = "1.0", features = ["v4"] }
warp = "0.3"
[dev-dependencies]
ballista-core = { path = "../core", version = "0.7.0" }
-uuid = { version = "1.0", features = ["v4"] }
[build-dependencies]
configure_me_codegen = "0.4.1"
diff --git a/ballista/rust/scheduler/src/flight_sql.rs
b/ballista/rust/scheduler/src/flight_sql.rs
new file mode 100644
index 00000000..37704f58
--- /dev/null
+++ b/ballista/rust/scheduler/src/flight_sql.rs
@@ -0,0 +1,572 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_flight::flight_descriptor::DescriptorType;
+use arrow_flight::flight_service_server::FlightService;
+use arrow_flight::sql::server::FlightSqlService;
+use arrow_flight::sql::{
+ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+ ActionCreatePreparedStatementResult, CommandGetCatalogs,
CommandGetCrossReference,
+ CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+ CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables,
+ CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery,
+ CommandStatementUpdate, SqlInfo, TicketStatementQuery,
+};
+use arrow_flight::{
+ FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, Location, Ticket,
+};
+use log::{debug, error, warn};
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tonic::{Response, Status, Streaming};
+
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::SchedulerServer;
+use arrow_flight::SchemaAsIpc;
+use ballista_core::config::BallistaConfig;
+use ballista_core::serde::protobuf;
+use ballista_core::serde::protobuf::job_status;
+use ballista_core::serde::protobuf::CompletedJob;
+use ballista_core::serde::protobuf::JobStatus;
+use ballista_core::serde::protobuf::PhysicalPlanNode;
+use datafusion::arrow;
+use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions};
+use datafusion::common::DFSchemaRef;
+use datafusion::logical_expr::LogicalPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_proto::protobuf::LogicalPlanNode;
+use prost::Message;
+use tokio::time::sleep;
+use uuid::Uuid;
+
+pub struct FlightSqlServiceImpl {
+ server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
+ statements: Arc<Mutex<HashMap<Uuid, LogicalPlan>>>,
+}
+
+impl FlightSqlServiceImpl {
+ pub fn new(server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>) ->
Self {
+ Self {
+ server,
+ statements: Arc::new(Mutex::new(HashMap::new())),
+ }
+ }
+
+ async fn create_ctx(&self) -> Result<Arc<SessionContext>, Status> {
+ let config_builder = BallistaConfig::builder();
+ let config = config_builder
+ .build()
+ .map_err(|e| Status::internal(format!("Error building config: {}",
e)))?;
+ let ctx = self
+ .server
+ .state
+ .session_manager
+ .create_session(&config)
+ .await
+ .map_err(|e| {
+ Status::internal(format!("Failed to create SessionContext:
{:?}", e))
+ })?;
+ Ok(ctx)
+ }
+
+ async fn prepare_statement(
+ query: &str,
+ ctx: &Arc<SessionContext>,
+ ) -> Result<LogicalPlan, Status> {
+ let plan = ctx
+ .sql(query)
+ .await
+ .and_then(|df| df.to_logical_plan())
+ .map_err(|e| Status::internal(format!("Error building plan: {}",
e)))?;
+ Ok(plan)
+ }
+
+ async fn check_job(&self, job_id: &String) -> Result<Option<CompletedJob>,
Status> {
+ let status = self
+ .server
+ .state
+ .task_manager
+ .get_job_status(job_id)
+ .await
+ .map_err(|e| {
+ let msg = format!("Error getting status for job {}: {:?}",
job_id, e);
+ error!("{}", msg);
+ Status::internal(msg)
+ })?;
+ let status: JobStatus = match status {
+ Some(status) => status,
+ None => {
+ let msg = format!("Error getting status for job {}!", job_id);
+ error!("{}", msg);
+ Err(Status::internal(msg))?
+ }
+ };
+ let status: job_status::Status = match status.status {
+ Some(status) => status,
+ None => {
+ let msg = format!("Error getting status for job {}!", job_id);
+ error!("{}", msg);
+ Err(Status::internal(msg))?
+ }
+ };
+ match status {
+ job_status::Status::Queued(_) => Ok(None),
+ job_status::Status::Running(_) => Ok(None),
+ job_status::Status::Failed(e) => {
+ warn!("Error executing plan: {:?}", e);
+ Err(Status::internal(format!(
+ "Error executing plan: {}",
+ e.error
+ )))?
+ }
+ job_status::Status::Completed(comp) => Ok(Some(comp)),
+ }
+ }
+
+ async fn job_to_fetch_part(
+ &self,
+ completed: CompletedJob,
+ num_rows: &mut i64,
+ num_bytes: &mut i64,
+ ) -> Result<Vec<FlightEndpoint>, Status> {
+ let mut fieps: Vec<_> = vec![];
+ for loc in completed.partition_location.iter() {
+ let fetch = if let Some(ref id) = loc.partition_id {
+ let fetch = protobuf::FetchPartition {
+ job_id: id.job_id.clone(),
+ stage_id: id.stage_id,
+ partition_id: id.partition_id,
+ path: loc.path.clone(),
+ };
+ protobuf::Action {
+ action_type:
Some(protobuf::action::ActionType::FetchPartition(
+ fetch,
+ )),
+ settings: vec![],
+ }
+ } else {
+ Err(Status::internal("Error getting partition
ID".to_string()))?
+ };
+ let authority = if let Some(ref md) = loc.executor_meta {
+ format!("{}:{}", md.host, md.port)
+ } else {
+ Err(Status::internal(
+ "Invalid partition location, missing executor
metadata".to_string(),
+ ))?
+ };
+ if let Some(ref stats) = loc.partition_stats {
+ *num_rows += stats.num_rows;
+ *num_bytes += stats.num_bytes;
+ } else {
+ Err(Status::internal("Error getting stats".to_string()))?
+ }
+ let loc = Location {
+ uri: format!("grpc+tcp://{}", authority),
+ };
+ let buf = fetch.encode_to_vec();
+ let ticket = Ticket { ticket: buf };
+ let fiep = FlightEndpoint {
+ ticket: Some(ticket),
+ location: vec![loc],
+ };
+ fieps.push(fiep);
+ }
+ Ok(fieps)
+ }
+
+ fn cache_plan(&self, plan: LogicalPlan) -> Result<Uuid, Status> {
+ let handle = Uuid::new_v4();
+ let mut statements = self
+ .statements
+ .try_lock()
+ .map_err(|e| Status::internal(format!("Error locking statements:
{}", e)))?;
+ statements.insert(handle, plan);
+ Ok(handle)
+ }
+
+ fn get_plan(&self, handle: &Uuid) -> Result<LogicalPlan, Status> {
+ let statements = self
+ .statements
+ .try_lock()
+ .map_err(|e| Status::internal(format!("Error locking statements:
{}", e)))?;
+ let plan = if let Some(plan) = statements.get(handle) {
+ plan
+ } else {
+ Err(Status::internal(format!(
+ "Statement handle not found: {}",
+ handle
+ )))?
+ };
+ Ok(plan.clone())
+ }
+
+ fn remove_plan(&self, handle: Uuid) -> Result<(), Status> {
+ let mut statements = self
+ .statements
+ .try_lock()
+ .map_err(|e| Status::internal(format!("Error locking statements:
{}", e)))?;
+ statements.remove(&handle);
+ Ok(())
+ }
+
+ fn df_schema_to_arrow(&self, schema: &DFSchemaRef) -> Result<Vec<u8>,
Status> {
+ let arrow_schema: Schema = (&**schema).into();
+ let options = IpcWriteOptions::default();
+ let pair = SchemaAsIpc::new(&arrow_schema, &options);
+ let data_gen = IpcDataGenerator::default();
+ let encoded_data = data_gen.schema_to_bytes(pair.0, pair.1);
+ let mut schema_bytes = vec![];
+ arrow::ipc::writer::write_message(&mut schema_bytes, encoded_data,
pair.1)
+ .map_err(|e| Status::internal(format!("Error encoding schema: {}",
e)))?;
+ Ok(schema_bytes)
+ }
+
+ async fn enqueue_job(
+ &self,
+ ctx: Arc<SessionContext>,
+ plan: &LogicalPlan,
+ ) -> Result<String, Status> {
+ let job_id = self.server.state.task_manager.generate_job_id();
+ self.server
+ .state
+ .task_manager
+ .queue_job(&job_id)
+ .await
+ .map_err(|e| {
+ let msg = format!("Failed to queue job {}: {:?}", job_id, e);
+ error!("{}", msg);
+
+ Status::internal(msg)
+ })?;
+ let query_stage_event_sender = self
+ .server
+ .query_stage_event_loop
+ .get_sender()
+ .map_err(|e| {
+ Status::internal(format!(
+ "Could not get query stage event sender due to: {}",
+ e
+ ))
+ })?;
+ query_stage_event_sender
+ .post_event(QueryStageSchedulerEvent::JobQueued {
+ job_id: job_id.clone(),
+ session_id: ctx.session_id().clone(),
+ session_ctx: ctx,
+ plan: Box::new(plan.clone()),
+ })
+ .await
+ .map_err(|e| {
+ let msg =
+ format!("Failed to send JobQueued event for {}: {:?}",
job_id, e);
+ error!("{}", msg);
+ Status::internal(msg)
+ })?;
+ Ok(job_id)
+ }
+
+ fn create_resp(
+ schema_bytes: Vec<u8>,
+ fieps: Vec<FlightEndpoint>,
+ num_rows: i64,
+ num_bytes: i64,
+ ) -> Response<FlightInfo> {
+ let flight_desc = FlightDescriptor {
+ r#type: DescriptorType::Cmd.into(),
+ cmd: vec![],
+ path: vec![],
+ };
+ let info = FlightInfo {
+ schema: schema_bytes,
+ flight_descriptor: Some(flight_desc),
+ endpoint: fieps,
+ total_records: num_rows,
+ total_bytes: num_bytes,
+ };
+ Response::new(info)
+ }
+
+ async fn execute_plan(
+ &self,
+ ctx: Arc<SessionContext>,
+ plan: &LogicalPlan,
+ ) -> Result<Response<FlightInfo>, Status> {
+ let job_id = self.enqueue_job(ctx, plan).await?;
+
+ // poll for job completion
+ let mut num_rows = 0;
+ let mut num_bytes = 0;
+ let fieps = loop {
+ sleep(Duration::from_millis(100)).await;
+ let completed = if let Some(comp) = self.check_job(&job_id).await?
{
+ comp
+ } else {
+ continue;
+ };
+ let fieps = self
+ .job_to_fetch_part(completed, &mut num_rows, &mut num_bytes)
+ .await?;
+ break fieps;
+ };
+
+ // Generate response
+ let schema_bytes = self.df_schema_to_arrow(plan.schema())?;
+ let resp = Self::create_resp(schema_bytes, fieps, num_rows, num_bytes);
+ Ok(resp)
+ }
+}
+
+#[tonic::async_trait]
+impl FlightSqlService for FlightSqlServiceImpl {
+ type FlightService = FlightSqlServiceImpl;
+
+ async fn get_flight_info_statement(
+ &self,
+ query: CommandStatementQuery,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ debug!("Got query:\n{}", query.query);
+
+ let ctx = self.create_ctx().await?;
+ let plan = Self::prepare_statement(&query.query, &ctx).await?;
+ let resp = self.execute_plan(ctx, &plan).await?;
+
+ debug!("Responding to query...");
+ Ok(resp)
+ }
+
+ async fn get_flight_info_prepared_statement(
+ &self,
+ handle: CommandPreparedStatementQuery,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ let ctx = self.create_ctx().await?;
+ let handle =
Uuid::from_slice(handle.prepared_statement_handle.as_slice())
+ .map_err(|e| Status::internal(format!("Error decoding handle: {}",
e)))?;
+ let plan = self.get_plan(&handle)?;
+ let resp = self.execute_plan(ctx, &plan).await?;
+
+ debug!("Responding to query...");
+ Ok(resp)
+ }
+
+ async fn get_flight_info_catalogs(
+ &self,
+ _query: CommandGetCatalogs,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented("Implement get_flight_info_catalogs"))
+ }
+ async fn get_flight_info_schemas(
+ &self,
+ _query: CommandGetDbSchemas,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented("Implement get_flight_info_schemas"))
+ }
+ async fn get_flight_info_tables(
+ &self,
+ _query: CommandGetTables,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented("Implement get_flight_info_tables"))
+ }
+ async fn get_flight_info_table_types(
+ &self,
+ _query: CommandGetTableTypes,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented(
+ "Implement get_flight_info_table_types",
+ ))
+ }
+ async fn get_flight_info_sql_info(
+ &self,
+ _query: CommandGetSqlInfo,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ // TODO: implement for FlightSQL JDBC to work
+ Err(Status::unimplemented("Implement CommandGetSqlInfo"))
+ }
+ async fn get_flight_info_primary_keys(
+ &self,
+ _query: CommandGetPrimaryKeys,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented(
+ "Implement get_flight_info_primary_keys",
+ ))
+ }
+ async fn get_flight_info_exported_keys(
+ &self,
+ _query: CommandGetExportedKeys,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented(
+ "Implement get_flight_info_exported_keys",
+ ))
+ }
+ async fn get_flight_info_imported_keys(
+ &self,
+ _query: CommandGetImportedKeys,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented(
+ "Implement get_flight_info_imported_keys",
+ ))
+ }
+ async fn get_flight_info_cross_reference(
+ &self,
+ _query: CommandGetCrossReference,
+ _request: FlightDescriptor,
+ ) -> Result<Response<FlightInfo>, Status> {
+ Err(Status::unimplemented(
+ "Implement get_flight_info_cross_reference",
+ ))
+ }
+
+ async fn do_get_statement(
+ &self,
+ _ticket: TicketStatementQuery,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ // let handle = Uuid::from_slice(&ticket.statement_handle)
+ // .map_err(|e| Status::internal(format!("Error decoding ticket:
{}", e)))?;
+ // let statements = self.statements.try_lock()
+ // .map_err(|e| Status::internal(format!("Error decoding ticket:
{}", e)))?;
+ // let plan = statements.get(&handle);
+ Err(Status::unimplemented("Implement do_get_statement"))
+ }
+
+ async fn do_get_prepared_statement(
+ &self,
+ _query: CommandPreparedStatementQuery,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_prepared_statement"))
+ }
+ async fn do_get_catalogs(
+ &self,
+ _query: CommandGetCatalogs,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_catalogs"))
+ }
+ async fn do_get_schemas(
+ &self,
+ _query: CommandGetDbSchemas,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_schemas"))
+ }
+ async fn do_get_tables(
+ &self,
+ _query: CommandGetTables,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_tables"))
+ }
+ async fn do_get_table_types(
+ &self,
+ _query: CommandGetTableTypes,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_table_types"))
+ }
+ async fn do_get_sql_info(
+ &self,
+ _query: CommandGetSqlInfo,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_sql_info"))
+ }
+ async fn do_get_primary_keys(
+ &self,
+ _query: CommandGetPrimaryKeys,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_primary_keys"))
+ }
+ async fn do_get_exported_keys(
+ &self,
+ _query: CommandGetExportedKeys,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_exported_keys"))
+ }
+ async fn do_get_imported_keys(
+ &self,
+ _query: CommandGetImportedKeys,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_imported_keys"))
+ }
+ async fn do_get_cross_reference(
+ &self,
+ _query: CommandGetCrossReference,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ Err(Status::unimplemented("Implement do_get_cross_reference"))
+ }
+ // do_put
+ async fn do_put_statement_update(
+ &self,
+ _ticket: CommandStatementUpdate,
+ ) -> Result<i64, Status> {
+ Err(Status::unimplemented("Implement do_put_statement_update"))
+ }
+ async fn do_put_prepared_statement_query(
+ &self,
+ _query: CommandPreparedStatementQuery,
+ _request: Streaming<FlightData>,
+ ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
+ Err(Status::unimplemented(
+ "Implement do_put_prepared_statement_query",
+ ))
+ }
+ async fn do_put_prepared_statement_update(
+ &self,
+ _query: CommandPreparedStatementUpdate,
+ _request: Streaming<FlightData>,
+ ) -> Result<i64, Status> {
+ Err(Status::unimplemented(
+ "Implement do_put_prepared_statement_update",
+ ))
+ }
+
+ async fn do_action_create_prepared_statement(
+ &self,
+ query: ActionCreatePreparedStatementRequest,
+ ) -> Result<ActionCreatePreparedStatementResult, Status> {
+ let ctx = self.create_ctx().await?;
+ let plan = Self::prepare_statement(&query.query, &ctx).await?;
+ let schema_bytes = self.df_schema_to_arrow(plan.schema())?;
+ let handle = self.cache_plan(plan)?;
+ let res = ActionCreatePreparedStatementResult {
+ prepared_statement_handle: handle.as_bytes().to_vec(),
+ dataset_schema: schema_bytes,
+ parameter_schema: vec![], // TODO: parameters
+ };
+ Ok(res)
+ }
+
+ async fn do_action_close_prepared_statement(
+ &self,
+ handle: ActionClosePreparedStatementRequest,
+ ) {
+ let handle =
Uuid::from_slice(handle.prepared_statement_handle.as_slice());
+ let handle = if let Ok(handle) = handle {
+ handle
+ } else {
+ return;
+ };
+ let _ = self.remove_plan(handle);
+ }
+
+ async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
+}
diff --git a/ballista/rust/scheduler/src/lib.rs
b/ballista/rust/scheduler/src/lib.rs
index ea39ef02..2c2fa410 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -24,5 +24,6 @@ pub mod scheduler_server;
pub mod standalone;
pub mod state;
+pub mod flight_sql;
#[cfg(test)]
pub mod test_utils;
diff --git a/ballista/rust/scheduler/src/main.rs
b/ballista/rust/scheduler/src/main.rs
index 2dd34c92..a8335314 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -18,6 +18,7 @@
//! Ballista Rust scheduler binary.
use anyhow::{Context, Result};
+use arrow_flight::flight_service_server::FlightServiceServer;
use
ballista_scheduler::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
use futures::future::{self, Either, TryFutureExt};
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
@@ -59,6 +60,7 @@ mod config {
));
}
+use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
use config::prelude::*;
use datafusion::execution::context::default_session_builder;
@@ -100,10 +102,15 @@ async fn start_server(
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone());
+ let flight_sql_server =
FlightServiceServer::new(FlightSqlServiceImpl::new(
+ scheduler_server.clone(),
+ ));
+
let keda_scaler =
ExternalScalerServer::new(scheduler_server.clone());
let mut tonic = TonicServer::builder()
.add_service(scheduler_grpc_server)
+ .add_service(flight_sql_server)
.add_service(keda_scaler)
.into_service();
let mut warp = warp::service(get_routes(scheduler_server.clone()));