This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 35782450 feat: configure max grpc message size and disable view types 
in ballista  (#1185)
35782450 is described below

commit 35782450e0d081e59cb0c13c3ddafa985d2425ea
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Feb 19 22:00:34 2025 +0000

    feat: configure max grpc message size and disable view types in ballista  
(#1185)
    
    * use configuration to propagate grpc settings
    
    * set `datafusion.execution.parquet.schema_force_view_types = false`
    
    * return better status when flight request fails
    
    * fix issue with variable assignment
    
    * fix typo and file name
    
    * add test to verify switching off configuration
---
 ballista/client/tests/context_checks.rs            | 32 +++++++
 ballista/core/src/client.rs                        | 37 +++++---
 ballista/core/src/config.rs                        | 10 +++
 .../core/src/execution_plans/distributed_query.rs  | 17 ++--
 .../core/src/execution_plans/shuffle_reader.rs     | 60 ++++++++-----
 ballista/core/src/extension.rs                     | 79 +++++++++++++----
 ballista/executor/src/executor_process.rs          | 18 +++-
 ballista/executor/src/flight_service.rs            |  6 +-
 ballista/executor/src/standalone.rs                | 98 +++++++---------------
 ballista/scheduler/src/scheduler_process.rs        | 12 ++-
 ballista/scheduler/src/standalone.rs               |  8 +-
 11 files changed, 247 insertions(+), 130 deletions(-)

diff --git a/ballista/client/tests/context_checks.rs 
b/ballista/client/tests/context_checks.rs
index a46f9380..d1ecf2f6 100644
--- a/ballista/client/tests/context_checks.rs
+++ b/ballista/client/tests/context_checks.rs
@@ -365,4 +365,36 @@ mod supported {
 
         Ok(())
     }
+
+    // test checks if this view types have been disabled in the configuration
+    //
+    // `datafusion.execution.parquet.schema_force_view_types` have been 
disabled
+    // temporary as they could break shuffle reader/writer.
+    #[rstest]
+    #[case::standalone(standalone_context())]
+    #[case::remote(remote_context())]
+    #[tokio::test]
+    async fn should_disable_view_types(
+        #[future(awt)]
+        #[case]
+        ctx: SessionContext,
+    ) -> datafusion::error::Result<()> {
+        let result = ctx
+            .sql("select name, value from information_schema.df_settings where 
name like 'datafusion.execution.parquet.schema_force_view_types' order by name 
limit 1")
+            .await?
+            .collect()
+            .await?;
+        //
+        let expected = [
+            "+------------------------------------------------------+-------+",
+            "| name                                                 | value |",
+            "+------------------------------------------------------+-------+",
+            "| datafusion.execution.parquet.schema_force_view_types | false |",
+            "+------------------------------------------------------+-------+",
+        ];
+
+        assert_batches_eq!(expected, &result);
+
+        Ok(())
+    }
 }
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index 2c99569b..3c76dc3d 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -61,7 +61,7 @@ const IO_RETRY_WAIT_TIME_MS: u64 = 3000;
 impl BallistaClient {
     /// Create a new BallistaClient to connect to the executor listening on 
the specified
     /// host and port
-    pub async fn try_new(host: &str, port: u16) -> Result<Self> {
+    pub async fn try_new(host: &str, port: u16, max_message_size: usize) -> 
Result<Self> {
         let addr = format!("http://{host}:{port}";);
         debug!("BallistaClient connecting to {}", addr);
         let connection =
@@ -72,8 +72,11 @@ impl BallistaClient {
                     "Error connecting to Ballista scheduler or executor at 
{addr}: {e:?}"
                 ))
                 })?;
-        let flight_client = FlightServiceClient::new(connection);
-        debug!("BallistaClient connected OK");
+        let flight_client = FlightServiceClient::new(connection)
+            .max_decoding_message_size(max_message_size)
+            .max_encoding_message_size(max_message_size);
+
+        debug!("BallistaClient connected OK: {:?}", flight_client);
 
         Ok(Self { flight_client })
     }
@@ -99,13 +102,27 @@ impl BallistaClient {
             .await
             .map_err(|error| match error {
                 // map grpc connection error to partition fetch error.
-                BallistaError::GrpcActionError(msg) => 
BallistaError::FetchFailed(
-                    executor_id.to_owned(),
-                    partition_id.stage_id,
-                    partition_id.partition_id,
-                    msg,
-                ),
-                other => other,
+                BallistaError::GrpcActionError(msg) => {
+                    log::warn!(
+                        "grpc client failed to fetch partition: {:?} , 
message: {:?}",
+                        partition_id,
+                        msg
+                    );
+                    BallistaError::FetchFailed(
+                        executor_id.to_owned(),
+                        partition_id.stage_id,
+                        partition_id.partition_id,
+                        msg,
+                    )
+                }
+                error => {
+                    log::warn!(
+                        "grpc client failed to fetch partition: {:?} , error: 
{:?}",
+                        partition_id,
+                        error
+                    );
+                    error
+                }
             })
     }
 
diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs
index 62882144..a7757eeb 100644
--- a/ballista/core/src/config.rs
+++ b/ballista/core/src/config.rs
@@ -32,6 +32,8 @@ pub const BALLISTA_STANDALONE_PARALLELISM: &str = 
"ballista.standalone.paralleli
 /// max message size for gRPC clients
 pub const BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE: &str =
     "ballista.grpc_client_max_message_size";
+pub const BALLISTA_SHUFFLE_READER_MAX_REQUESTS: &str =
+    "ballista.shuffle.max_concurrent_read_requests";
 
 pub type ParseResult<T> = result::Result<T, String>;
 use std::sync::LazyLock;
@@ -48,6 +50,10 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, 
ConfigEntry>> = LazyLock::new(||
                          "Configuration for max message size in gRPC 
clients".to_string(),
                          DataType::UInt64,
                          Some((16 * 1024 * 1024).to_string())),
+        ConfigEntry::new(BALLISTA_SHUFFLE_READER_MAX_REQUESTS.to_string(),
+                         "Maximum concurrent requests shuffle reader can 
process".to_string(),
+                         DataType::UInt64,
+                         Some((64).to_string())),
     ];
     entries
         .into_iter()
@@ -165,6 +171,10 @@ impl BallistaConfig {
         self.get_usize_setting(BALLISTA_STANDALONE_PARALLELISM)
     }
 
+    pub fn shuffle_reader_maximum_concurrent_requests(&self) -> usize {
+        self.get_usize_setting(BALLISTA_SHUFFLE_READER_MAX_REQUESTS)
+    }
+
     fn get_usize_setting(&self, key: &str) -> usize {
         if let Some(v) = self.settings.get(key) {
             // infallible because we validate all configs in the constructor
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index e9596ef1..59503814 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -310,12 +310,16 @@ async fn execute_query(
                 break Err(DataFusionError::Execution(msg));
             }
             Some(job_status::Status::Successful(successful)) => {
-                let streams = 
successful.partition_location.into_iter().map(|p| {
-                    let f = fetch_partition(p)
-                        .map_err(|e| ArrowError::ExternalError(Box::new(e)));
+                let streams =
+                    successful
+                        .partition_location
+                        .into_iter()
+                        .map(move |partition| {
+                            let f = fetch_partition(partition, 
max_message_size)
+                                .map_err(|e| 
ArrowError::ExternalError(Box::new(e)));
 
-                    futures::stream::once(f).try_flatten()
-                });
+                            futures::stream::once(f).try_flatten()
+                        });
 
                 break Ok(futures::stream::iter(streams).flatten());
             }
@@ -325,6 +329,7 @@ async fn execute_query(
 
 async fn fetch_partition(
     location: PartitionLocation,
+    max_message_size: usize,
 ) -> Result<SendableRecordBatchStream> {
     let metadata = location.executor_meta.ok_or_else(|| {
         DataFusionError::Internal("Received empty executor 
metadata".to_owned())
@@ -334,7 +339,7 @@ async fn fetch_partition(
     })?;
     let host = metadata.host.as_str();
     let port = metadata.port as u16;
-    let mut ballista_client = BallistaClient::try_new(host, port)
+    let mut ballista_client = BallistaClient::try_new(host, port, 
max_message_size)
         .await
         .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
     ballista_client
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 7a20f121..f8259364 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -29,6 +29,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::client::BallistaClient;
+use crate::extension::SessionConfigExt;
 use crate::serde::scheduler::{PartitionLocation, PartitionStats};
 
 use datafusion::arrow::datatypes::SchemaRef;
@@ -146,8 +147,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         let task_id = context.task_id().unwrap_or_else(|| 
partition.to_string());
         info!("ShuffleReaderExec::execute({})", task_id);
 
-        // TODO make the maximum size configurable, or make it depends on 
global memory control
-        let max_request_num = 50usize;
+        let config = context.session_config();
+
+        let max_request_num =
+            config.ballista_shuffle_reader_maximum_concurrent_requests();
+        let max_message_size = config.ballista_grpc_client_max_message_size();
+
+        log::debug!(
+            "ShuffleReaderExec::execute({}) max_request_num: {}, 
max_message_size: {}",
+            task_id,
+            max_request_num,
+            max_message_size
+        );
         let mut partition_locations = HashMap::new();
         for p in &self.partition[partition] {
             partition_locations
@@ -166,7 +177,7 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition_locations.shuffle(&mut thread_rng());
 
         let response_receiver =
-            send_fetch_partitions(partition_locations, max_request_num);
+            send_fetch_partitions(partition_locations, max_request_num, 
max_message_size);
 
         let result = RecordBatchStreamAdapter::new(
             Arc::new(self.schema.as_ref().clone()),
@@ -284,6 +295,7 @@ impl Stream for AbortableReceiverStream {
 fn send_fetch_partitions(
     partition_locations: Vec<PartitionLocation>,
     max_request_num: usize,
+    max_message_size: usize,
 ) -> AbortableReceiverStream {
     let (response_sender, response_receiver) = mpsc::channel(max_request_num);
     let semaphore = Arc::new(Semaphore::new(max_request_num));
@@ -302,7 +314,9 @@ fn send_fetch_partitions(
     let response_sender_c = response_sender.clone();
     spawned_tasks.push(SpawnedTask::spawn(async move {
         for p in local_locations {
-            let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
+            let r = PartitionReaderEnum::Local
+                .fetch_partition(&p, max_message_size)
+                .await;
             if let Err(e) = response_sender_c.send(r).await {
                 error!("Fail to send response event to the channel due to {}", 
e);
             }
@@ -315,7 +329,9 @@ fn send_fetch_partitions(
         spawned_tasks.push(SpawnedTask::spawn(async move {
             // Block if exceeds max request number.
             let permit = semaphore.acquire_owned().await.unwrap();
-            let r = 
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
+            let r = PartitionReaderEnum::FlightRemote
+                .fetch_partition(&p, max_message_size)
+                .await;
             // Block if the channel buffer is full.
             if let Err(e) = response_sender.send(r).await {
                 error!("Fail to send response event to the channel due to {}", 
e);
@@ -339,6 +355,7 @@ trait PartitionReader: Send + Sync + Clone {
     async fn fetch_partition(
         &self,
         location: &PartitionLocation,
+        max_message_size: usize,
     ) -> result::Result<SendableRecordBatchStream, BallistaError>;
 }
 
@@ -356,9 +373,12 @@ impl PartitionReader for PartitionReaderEnum {
     async fn fetch_partition(
         &self,
         location: &PartitionLocation,
+        max_message_size: usize,
     ) -> result::Result<SendableRecordBatchStream, BallistaError> {
         match self {
-            PartitionReaderEnum::FlightRemote => 
fetch_partition_remote(location).await,
+            PartitionReaderEnum::FlightRemote => {
+                fetch_partition_remote(location, max_message_size).await
+            }
             PartitionReaderEnum::Local => 
fetch_partition_local(location).await,
             PartitionReaderEnum::ObjectStoreRemote => {
                 fetch_partition_object_store(location).await
@@ -369,6 +389,7 @@ impl PartitionReader for PartitionReaderEnum {
 
 async fn fetch_partition_remote(
     location: &PartitionLocation,
+    max_message_size: usize,
 ) -> result::Result<SendableRecordBatchStream, BallistaError> {
     let metadata = &location.executor_meta;
     let partition_id = &location.partition_id;
@@ -376,19 +397,18 @@ async fn fetch_partition_remote(
     // And we should also avoid to keep alive too many connections for long 
time.
     let host = metadata.host.as_str();
     let port = metadata.port;
-    let mut ballista_client =
-        BallistaClient::try_new(host, port)
-            .await
-            .map_err(|error| match error {
-                // map grpc connection error to partition fetch error.
-                BallistaError::GrpcConnectionError(msg) => 
BallistaError::FetchFailed(
-                    metadata.id.clone(),
-                    partition_id.stage_id,
-                    partition_id.partition_id,
-                    msg,
-                ),
-                other => other,
-            })?;
+    let mut ballista_client = BallistaClient::try_new(host, port, 
max_message_size)
+        .await
+        .map_err(|error| match error {
+            // map grpc connection error to partition fetch error.
+            BallistaError::GrpcConnectionError(msg) => 
BallistaError::FetchFailed(
+                metadata.id.clone(),
+                partition_id.stage_id,
+                partition_id.partition_id,
+                msg,
+            ),
+            other => other,
+        })?;
 
     ballista_client
         .fetch_partition(&metadata.id, partition_id, &location.path, host, 
port)
@@ -644,7 +664,7 @@ mod tests {
         );
 
         let response_receiver =
-            send_fetch_partitions(partition_locations, max_request_num);
+            send_fetch_partitions(partition_locations, max_request_num, 4 * 
1024 * 1024);
 
         let stream = RecordBatchStreamAdapter::new(
             Arc::new(schema),
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index 182113b8..4ff1af98 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -17,7 +17,7 @@
 
 use crate::config::{
     BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
-    BALLISTA_STANDALONE_PARALLELISM,
+    BALLISTA_SHUFFLE_READER_MAX_REQUESTS, BALLISTA_STANDALONE_PARALLELISM,
 };
 use crate::planner::BallistaQueryPlanner;
 use crate::serde::protobuf::KeyValuePair;
@@ -103,6 +103,15 @@ pub trait SessionConfigExt {
 
     /// Sets ballista job name
     fn with_ballista_job_name(self, job_name: &str) -> Self;
+
+    /// get maximum in flight requests for shuffle reader
+    fn ballista_shuffle_reader_maximum_concurrent_requests(&self) -> usize;
+
+    /// Sets maximum in flight requests for shuffle reader
+    fn with_ballista_shuffle_reader_maximum_concurrent_requests(
+        self,
+        max_requests: usize,
+    ) -> Self;
 }
 
 /// [SessionConfigHelperExt] is set of [SessionConfig] extension methods
@@ -114,6 +123,9 @@ pub trait SessionConfigHelperExt {
     fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> 
Self;
     /// updates mut [SessionConfig] from proto
     fn update_from_key_value_pair_mut(&mut self, key_value_pairs: 
&[KeyValuePair]);
+    /// changes some of default datafusion configuration
+    /// in order to make it suitable for ballista
+    fn ballista_restricted_configuration(self) -> Self;
 }
 
 impl SessionStateExt for SessionState {
@@ -121,16 +133,11 @@ impl SessionStateExt for SessionState {
         scheduler_url: String,
         session_id: String,
     ) -> datafusion::error::Result<SessionState> {
-        let config = BallistaConfig::default();
-
-        let planner =
-            BallistaQueryPlanner::<LogicalPlanNode>::new(scheduler_url, 
config.clone());
-
-        let session_config = SessionConfig::new()
-            .with_information_schema(true)
-            .with_option_extension(config.clone())
-            // Ballista disables this option
-            .with_round_robin_repartition(false);
+        let session_config = SessionConfig::new_with_ballista();
+        let planner = BallistaQueryPlanner::<LogicalPlanNode>::new(
+            scheduler_url,
+            BallistaConfig::default(),
+        );
 
         let runtime_env = RuntimeEnvBuilder::new().build()?;
         let session_state = SessionStateBuilder::new()
@@ -164,8 +171,7 @@ impl SessionStateExt for SessionState {
             .config()
             .clone()
             .with_option_extension(new_config.clone())
-            // Ballista disables this option
-            .with_round_robin_repartition(false);
+            .ballista_restricted_configuration();
 
         let builder = SessionStateBuilder::new_from_existing(self)
             .with_config(session_config)
@@ -191,8 +197,9 @@ impl SessionConfigExt for SessionConfig {
     fn new_with_ballista() -> SessionConfig {
         SessionConfig::new()
             .with_option_extension(BallistaConfig::default())
+            .with_information_schema(true)
             .with_target_partitions(16)
-            .with_round_robin_repartition(false)
+            .ballista_restricted_configuration()
     }
     fn with_ballista_logical_extension_codec(
         self,
@@ -279,6 +286,28 @@ impl SessionConfigExt for SessionConfig {
                 .set_usize(BALLISTA_STANDALONE_PARALLELISM, parallelism)
         }
     }
+
+    fn ballista_shuffle_reader_maximum_concurrent_requests(&self) -> usize {
+        self.options()
+            .extensions
+            .get::<BallistaConfig>()
+            .map(|c| c.shuffle_reader_maximum_concurrent_requests())
+            .unwrap_or_else(|| {
+                
BallistaConfig::default().shuffle_reader_maximum_concurrent_requests()
+            })
+    }
+
+    fn with_ballista_shuffle_reader_maximum_concurrent_requests(
+        self,
+        max_requests: usize,
+    ) -> Self {
+        if self.options().extensions.get::<BallistaConfig>().is_some() {
+            self.set_usize(BALLISTA_SHUFFLE_READER_MAX_REQUESTS, max_requests)
+        } else {
+            self.with_option_extension(BallistaConfig::default())
+                .set_usize(BALLISTA_SHUFFLE_READER_MAX_REQUESTS, max_requests)
+        }
+    }
 }
 
 impl SessionConfigHelperExt for SessionConfig {
@@ -332,6 +361,28 @@ impl SessionConfigHelperExt for SessionConfig {
             }
         }
     }
+
+    fn ballista_restricted_configuration(self) -> Self {
+        self
+            // round robbin repartition does not work well with ballista.
+            // this setting it will also be enforced by the scheduler
+            // thus user will not be able to override it.
+            .with_round_robin_repartition(false)
+            // There is issue with Utv8View(s) where Arrow IPC will generate
+            // frames which would be too big to send using Arrow Flight.
+            //
+            // This configuration option will be disabled temporary.
+            //
+            // This configuration is not enforced by the scheduler, thus
+            // user could override this setting using `SET` operation.
+            //
+            // TODO: enable this option once we get to root of the problem
+            //       between `IpcWriter` and `ViewTypes`
+            .set_bool(
+                "datafusion.execution.parquet.schema_force_view_types",
+                false,
+            )
+    }
 }
 
 /// Wrapper for [SessionConfig] extension
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index b8edc596..b3aa629a 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -365,7 +365,13 @@ pub async fn start_executor_process(
     service_handlers.push(match override_flight {
         None => {
             info!("Starting built-in arrow flight service");
-            flight_server_task(address, shutdown).await
+            flight_server_task(
+                address,
+                shutdown,
+                opt.grpc_max_encoding_message_size as usize,
+                opt.grpc_max_decoding_message_size as usize,
+            )
+            .await
         }
         Some(flight_provider) => {
             info!("Starting custom, user provided, arrow flight service");
@@ -471,12 +477,18 @@ pub async fn start_executor_process(
 async fn flight_server_task(
     address: SocketAddr,
     mut grpc_shutdown: Shutdown,
+    max_encoding_message_size: usize,
+    max_decoding_message_size: usize,
 ) -> JoinHandle<Result<(), BallistaError>> {
     tokio::spawn(async move {
-        info!("Built-in arrow flight server listening on: {:?}", address);
+        info!("Built-in arrow flight server listening on: {:?} 
max_encoding_size: {} max_decoding_size: {}", address, 
max_encoding_message_size, max_decoding_message_size);
 
         let server_future = create_grpc_server()
-            
.add_service(FlightServiceServer::new(BallistaFlightService::new()))
+            .add_service(
+                FlightServiceServer::new(BallistaFlightService::new())
+                    .max_decoding_message_size(max_decoding_message_size)
+                    .max_encoding_message_size(max_encoding_message_size),
+            )
             .serve_with_shutdown(address, grpc_shutdown.recv());
 
         server_future.await.map_err(|e| {
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index 939b5a8f..26ec814e 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -234,9 +234,9 @@ where
                 if let SendError(Err(err)) = err {
                     err
                 } else {
-                    FlightError::Tonic(Status::internal(
-                        "Can't send a batch, something went wrong",
-                    ))
+                    FlightError::Tonic(Status::internal(format!(
+                        "Can't send a batch, something went wrong: {err:?}"
+                    )))
                 }
             })?
     }
diff --git a/ballista/executor/src/standalone.rs 
b/ballista/executor/src/standalone.rs
index b16e4a3a..38c46d02 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -31,11 +31,7 @@ use ballista_core::{
     BALLISTA_VERSION,
 };
 use ballista_core::{ConfigProducer, RuntimeProducer};
-use datafusion::execution::runtime_env::RuntimeEnvBuilder;
-use datafusion::execution::SessionState;
-use datafusion::prelude::SessionConfig;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use datafusion_proto::physical_plan::AsExecutionPlan;
+use datafusion::execution::{SessionState, SessionStateBuilder};
 use log::info;
 use std::sync::Arc;
 use tempfile::TempDir;
@@ -55,6 +51,7 @@ pub async fn new_standalone_executor_from_state(
 ) -> Result<()> {
     let logical = session_state.config().ballista_logical_extension_codec();
     let physical = session_state.config().ballista_physical_extension_codec();
+
     let codec: BallistaCodec<
         datafusion_proto::protobuf::LogicalPlanNode,
         datafusion_proto::protobuf::PhysicalPlanNode,
@@ -63,7 +60,9 @@ pub async fn new_standalone_executor_from_state(
     let config = session_state
         .config()
         .clone()
-        .with_option_extension(BallistaConfig::default());
+        .with_option_extension(BallistaConfig::default()) // TODO: do we need 
this statement
+        ;
+
     let runtime = session_state.runtime_env().clone();
 
     let config_producer: ConfigProducer = Arc::new(move || config.clone());
@@ -90,16 +89,16 @@ pub async fn new_standalone_executor_from_builder(
 ) -> Result<()> {
     // Let the OS assign a random, free port
     let listener = TcpListener::bind("localhost:0").await?;
-    let addr = listener.local_addr()?;
+    let address = listener.local_addr()?;
     info!(
         "Ballista v{} Rust Executor listening on {:?}",
-        BALLISTA_VERSION, addr
+        BALLISTA_VERSION, address
     );
 
     let executor_meta = ExecutorRegistration {
         id: Uuid::new_v4().to_string(), // assign this executor a unique ID
         host: Some("localhost".to_string()),
-        port: addr.port() as u32,
+        port: address.port() as u32,
         // TODO Make it configurable
         grpc_port: 50020,
         specification: Some(
@@ -110,6 +109,9 @@ pub async fn new_standalone_executor_from_builder(
         ),
     };
 
+    let config = config_producer();
+    let max_message_size = config.ballista_grpc_client_max_message_size();
+
     let work_dir = TempDir::new()?
         .into_path()
         .into_os_string()
@@ -130,7 +132,10 @@ pub async fn new_standalone_executor_from_builder(
     ));
 
     let service = BallistaFlightService::new();
-    let server = FlightServiceServer::new(service);
+    let server = FlightServiceServer::new(service)
+        .max_decoding_message_size(max_message_size)
+        .max_encoding_message_size(max_message_size);
+
     tokio::spawn(
         create_grpc_server()
             .add_service(server)
@@ -145,69 +150,22 @@ pub async fn new_standalone_executor_from_builder(
 
 /// Creates standalone executor with most values
 /// set as default.
-pub async fn new_standalone_executor<
-    T: 'static + AsLogicalPlan,
-    U: 'static + AsExecutionPlan,
->(
+pub async fn new_standalone_executor(
     scheduler: SchedulerGrpcClient<Channel>,
     concurrent_tasks: usize,
-    codec: BallistaCodec<T, U>,
+    codec: BallistaCodec,
 ) -> Result<()> {
-    // Let the OS assign a random, free port
-    let listener = TcpListener::bind("localhost:0").await?;
-    let addr = listener.local_addr()?;
-    info!(
-        "Ballista v{} Rust Executor listening on {:?}",
-        BALLISTA_VERSION, addr
-    );
-
-    let executor_meta = ExecutorRegistration {
-        id: Uuid::new_v4().to_string(), // assign this executor a unique ID
-        host: Some("localhost".to_string()),
-        port: addr.port() as u32,
-        // TODO Make it configurable
-        grpc_port: 50020,
-        specification: Some(
-            ExecutorSpecification {
-                task_slots: concurrent_tasks as u32,
-            }
-            .into(),
-        ),
-    };
-    let work_dir = TempDir::new()?
-        .into_path()
-        .into_os_string()
-        .into_string()
-        .unwrap();
-    info!("work_dir: {}", work_dir);
-
-    let config_producer = Arc::new(default_config_producer);
-    let wd = work_dir.clone();
-    let runtime_producer: RuntimeProducer = Arc::new(move |_: &SessionConfig| {
-        let runtime_env = RuntimeEnvBuilder::new()
-            .with_temp_file_path(wd.clone())
-            .build()?;
-        Ok(Arc::new(runtime_env))
-    });
+    let session_state = 
SessionStateBuilder::new().with_default_features().build();
+    let runtime = session_state.runtime_env().clone();
+    let runtime_producer: RuntimeProducer = Arc::new(move |_| 
Ok(runtime.clone()));
 
-    let executor = Arc::new(Executor::new_basic(
-        executor_meta,
-        &work_dir,
-        runtime_producer,
-        config_producer,
+    new_standalone_executor_from_builder(
+        scheduler,
         concurrent_tasks,
-    ));
-
-    let service = BallistaFlightService::new();
-    let server = FlightServiceServer::new(service);
-    tokio::spawn(
-        create_grpc_server()
-            .add_service(server)
-            
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
-                listener,
-            )),
-    );
-
-    tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
-    Ok(())
+        Arc::new(default_config_producer),
+        runtime_producer,
+        codec,
+        (&session_state).into(),
+    )
+    .await
 }
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index bf6d484f..f22fe658 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -91,9 +91,15 @@ pub async fn start_server(
         
tonic_builder.add_service(ExternalScalerServer::new(scheduler_server.clone()));
 
     #[cfg(feature = "flight-sql")]
-    let tonic_builder = tonic_builder.add_service(FlightServiceServer::new(
-        FlightSqlServiceImpl::new(scheduler_server.clone()),
-    ));
+    let tonic_builder = tonic_builder.add_service(
+        
FlightServiceServer::new(FlightSqlServiceImpl::new(scheduler_server.clone()))
+            .max_encoding_message_size(
+                config.grpc_server_max_encoding_message_size as usize,
+            )
+            .max_decoding_message_size(
+                config.grpc_server_max_decoding_message_size as usize,
+            ),
+    );
 
     let tonic = tonic_builder.into_service().into_axum_router();
 
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index e9c48345..f7d12152 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -74,8 +74,11 @@ pub async fn new_standalone_scheduler_with_builder(
     config_producer: ConfigProducer,
     codec: BallistaCodec,
 ) -> Result<SocketAddr> {
+    let config = config_producer();
+
     let cluster =
         BallistaCluster::new_memory("localhost:50050", session_builder, 
config_producer);
+
     let metrics_collector = default_metrics_collector()?;
 
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, 
PhysicalPlanNode> =
@@ -88,7 +91,10 @@ pub async fn new_standalone_scheduler_with_builder(
         );
 
     scheduler_server.init().await?;
-    let server = SchedulerGrpcServer::new(scheduler_server.clone());
+    let server = SchedulerGrpcServer::new(scheduler_server.clone())
+        
.max_decoding_message_size(config.ballista_grpc_client_max_message_size())
+        
.max_encoding_message_size(config.ballista_grpc_client_max_message_size());
+
     // Let the OS assign a random, free port
     let listener = TcpListener::bind("localhost:0").await?;
     let addr = listener.local_addr()?;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to