This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ecfb1b  [Ballista] Make external hostname in executor optional (#232)
5ecfb1b is described below

commit 5ecfb1b2a2750376c3721ea1754ef194469a161c
Author: Ximo Guanter <[email protected]>
AuthorDate: Sun May 2 19:35:20 2021 +0200

    [Ballista] Make external hostname in executor optional (#232)
---
 ballista/rust/core/proto/ballista.proto            |  12 +-
 ballista/rust/core/src/client.rs                   |   1 -
 .../core/src/execution_plans/unresolved_shuffle.rs |   1 -
 ballista/rust/executor/executor_config_spec.toml   |   3 +-
 ballista/rust/executor/src/execution_loop.rs       |   5 +-
 ballista/rust/executor/src/flight_service.rs       |  14 +-
 ballista/rust/executor/src/lib.rs                  |  31 ----
 ballista/rust/executor/src/main.rs                 |  50 ++++--
 ballista/rust/scheduler/src/lib.rs                 |  59 ++++---
 ballista/rust/scheduler/src/main.rs                |  13 +-
 ballista/rust/scheduler/src/planner.rs             | 177 ++-------------------
 benchmarks/docker-compose.yaml                     |  19 +--
 12 files changed, 115 insertions(+), 270 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 5733921..b6bc5d0 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -514,6 +514,16 @@ message ExecutorMetadata {
   uint32 port = 3;
 }
 
+message ExecutorRegistration {
+  string id = 1;
+  // "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 
(see https://github.com/danburkert/prost/issues/430)
+  // this syntax is ugly but is binary compatible with the "optional" keyword 
(see 
https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
+  oneof optional_host {
+    string host = 2;
+  }
+  uint32 port = 3;
+}
+
 message GetExecutorMetadataParams {}
 
 message GetExecutorMetadataResult {
@@ -542,7 +552,7 @@ message TaskStatus {
 }
 
 message PollWorkParams {
-  ExecutorMetadata metadata = 1;
+  ExecutorRegistration metadata = 1;
   bool can_accept_task = 2;
   // All tasks must be reported until they reach the failed or completed state
   repeated TaskStatus task_status = 3;
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index f64f95f..1d0fedc 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -58,7 +58,6 @@ pub struct BallistaClient {
 impl BallistaClient {
     /// Create a new BallistaClient to connect to the executor listening on 
the specified
     /// host and port
-
     pub async fn try_new(host: &str, port: u16) -> Result<Self> {
         let addr = format!("http://{}:{}";, host, port);
         debug!("BallistaClient connecting to {}", addr);
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs 
b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index a62a251..7d147d5 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
-use crate::client::BallistaClient;
 use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionLocation;
 
diff --git a/ballista/rust/executor/executor_config_spec.toml 
b/ballista/rust/executor/executor_config_spec.toml
index 2a7c96b..8d817fe 100644
--- a/ballista/rust/executor/executor_config_spec.toml
+++ b/ballista/rust/executor/executor_config_spec.toml
@@ -49,8 +49,7 @@ doc = "Local IP address to bind to."
 [[param]]
 name = "external_host"
 type = "String"
-default = "std::string::String::from(\"localhost\")"
-doc = "Host name or IP address to register with scheduler so that other 
executors can connect to this executor."
+doc = "Host name or IP address to register with scheduler so that other 
executors can connect to this executor. If none is provided, the scheduler will 
use the connecting IP address to communicate with the executor."
 
 [[param]]
 abbr = "p"
diff --git a/ballista/rust/executor/src/execution_loop.rs 
b/ballista/rust/executor/src/execution_loop.rs
index cf641dd..5574a14 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -24,7 +24,7 @@ use datafusion::physical_plan::ExecutionPlan;
 use log::{debug, error, info, warn};
 use tonic::transport::Channel;
 
-use ballista_core::serde::scheduler::ExecutorMeta;
+use ballista_core::serde::protobuf::ExecutorRegistration;
 use ballista_core::{
     client::BallistaClient,
     serde::protobuf::{
@@ -37,10 +37,9 @@ use protobuf::CompletedTask;
 pub async fn poll_loop(
     mut scheduler: SchedulerGrpcClient<Channel>,
     executor_client: BallistaClient,
-    executor_meta: ExecutorMeta,
+    executor_meta: ExecutorRegistration,
     concurrent_tasks: usize,
 ) {
-    let executor_meta: protobuf::ExecutorMetadata = executor_meta.into();
     let available_tasks_slots = Arc::new(AtomicUsize::new(concurrent_tasks));
     let (task_status_sender, mut task_status_receiver) =
         std::sync::mpsc::channel::<TaskStatus>();
diff --git a/ballista/rust/executor/src/flight_service.rs 
b/ballista/rust/executor/src/flight_service.rs
index 8fff3db..115e1ab 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -23,7 +23,6 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::time::Instant;
 
-use crate::BallistaExecutor;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::{Action as BallistaAction, 
PartitionStats};
@@ -59,12 +58,12 @@ type FlightDataReceiver = Receiver<Result<FlightData, 
Status>>;
 /// Service implementing the Apache Arrow Flight Protocol
 #[derive(Clone)]
 pub struct BallistaFlightService {
-    executor: Arc<BallistaExecutor>,
+    work_dir: String,
 }
 
 impl BallistaFlightService {
-    pub fn new(executor: Arc<BallistaExecutor>) -> Self {
-        Self { executor }
+    pub fn new(work_dir: String) -> Self {
+        Self { work_dir }
     }
 }
 
@@ -103,11 +102,10 @@ impl FlightService for BallistaFlightService {
                 );
 
                 let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = 
vec![];
-                for part in partition.partition_id.clone() {
-                    let work_dir = self.executor.config.work_dir.clone();
+                for &part in &partition.partition_id {
+                    let mut path = PathBuf::from(&self.work_dir);
                     let partition = partition.clone();
                     tasks.push(tokio::spawn(async move {
-                        let mut path = PathBuf::from(&work_dir);
                         path.push(partition.job_id);
                         path.push(&format!("{}", partition.stage_id));
                         path.push(&format!("{}", part));
@@ -208,7 +206,7 @@ impl FlightService for BallistaFlightService {
                 // fetch a partition that was previously executed by this 
executor
                 info!("FetchPartition {:?}", partition_id);
 
-                let mut path = PathBuf::from(&self.executor.config.work_dir);
+                let mut path = PathBuf::from(&self.work_dir);
                 path.push(&partition_id.job_id);
                 path.push(&format!("{}", partition_id.stage_id));
                 path.push(&format!("{}", partition_id.partition_id));
diff --git a/ballista/rust/executor/src/lib.rs 
b/ballista/rust/executor/src/lib.rs
index 3d7bbac..08646eb 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -19,34 +19,3 @@
 
 pub mod collect;
 pub mod flight_service;
-
-#[derive(Debug, Clone)]
-pub struct ExecutorConfig {
-    pub(crate) host: String,
-    pub(crate) port: u16,
-    /// Directory for temporary files, such as IPC files
-    pub(crate) work_dir: String,
-    pub(crate) concurrent_tasks: usize,
-}
-
-impl ExecutorConfig {
-    pub fn new(host: &str, port: u16, work_dir: &str, concurrent_tasks: usize) 
-> Self {
-        Self {
-            host: host.to_owned(),
-            port,
-            work_dir: work_dir.to_owned(),
-            concurrent_tasks,
-        }
-    }
-}
-
-#[allow(dead_code)]
-pub struct BallistaExecutor {
-    pub(crate) config: ExecutorConfig,
-}
-
-impl BallistaExecutor {
-    pub fn new(config: ExecutorConfig) -> Self {
-        Self { config }
-    }
-}
diff --git a/ballista/rust/executor/src/main.rs 
b/ballista/rust/executor/src/main.rs
index 9c8d466..ad7c001 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -17,7 +17,10 @@
 
 //! Ballista Rust executor binary.
 
-use std::sync::Arc;
+use std::{
+    net::{IpAddr, Ipv4Addr},
+    sync::Arc,
+};
 
 use anyhow::{Context, Result};
 use arrow_flight::flight_service_server::FlightServiceServer;
@@ -28,15 +31,17 @@ use tonic::transport::Server;
 use uuid::Uuid;
 
 use ballista_core::{
-    client::BallistaClient, 
serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient,
+    client::BallistaClient,
+    serde::protobuf::{
+        executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
+        ExecutorRegistration,
+    },
 };
 use ballista_core::{
     print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
-    serde::scheduler::ExecutorMeta, BALLISTA_VERSION,
-};
-use ballista_executor::{
-    flight_service::BallistaFlightService, BallistaExecutor, ExecutorConfig,
+    BALLISTA_VERSION,
 };
+use ballista_executor::flight_service::BallistaFlightService;
 use ballista_scheduler::{state::StandaloneClient, SchedulerServer};
 use config::prelude::*;
 
@@ -80,7 +85,7 @@ async fn main() -> Result<()> {
         .with_context(|| format!("Could not parse address: {}", addr))?;
 
     let scheduler_host = if opt.local {
-        external_host.to_owned()
+        "localhost".to_string()
     } else {
         opt.scheduler_host
     };
@@ -94,14 +99,16 @@ async fn main() -> Result<()> {
             .into_string()
             .unwrap(),
     );
-    let config =
-        ExecutorConfig::new(&external_host, port, &work_dir, 
opt.concurrent_tasks);
-    info!("Running with config: {:?}", config);
+    info!("Running with config:");
+    info!("work_dir: {}", work_dir);
+    info!("concurrent_tasks: {}", opt.concurrent_tasks);
 
-    let executor_meta = ExecutorMeta {
+    let executor_meta = ExecutorRegistration {
         id: Uuid::new_v4().to_string(), // assign this executor a unique ID
-        host: external_host.clone(),
-        port,
+        optional_host: external_host
+            .clone()
+            .map(executor_registration::OptionalHost::Host),
+        port: port as u32,
     };
 
     if opt.local {
@@ -117,8 +124,9 @@ async fn main() -> Result<()> {
         let server = SchedulerGrpcServer::new(SchedulerServer::new(
             Arc::new(client),
             "ballista".to_string(),
+            IpAddr::V4(Ipv4Addr::LOCALHOST),
         ));
-        let addr = format!("{}:{}", bind_host, scheduler_port);
+        let addr = format!("localhost:{}", scheduler_port);
         let addr = addr
             .parse()
             .with_context(|| format!("Could not parse {}", addr))?;
@@ -158,8 +166,7 @@ async fn main() -> Result<()> {
     let scheduler = SchedulerGrpcClient::connect(scheduler_url)
         .await
         .context("Could not connect to scheduler")?;
-    let executor = Arc::new(BallistaExecutor::new(config));
-    let service = BallistaFlightService::new(executor);
+    let service = BallistaFlightService::new(work_dir);
 
     let server = FlightServiceServer::new(service);
     info!(
@@ -167,7 +174,16 @@ async fn main() -> Result<()> {
         BALLISTA_VERSION, addr
     );
     let server_future = 
tokio::spawn(Server::builder().add_service(server).serve(addr));
-    let client = BallistaClient::try_new(&external_host, port).await?;
+    let client_host = external_host.as_deref().unwrap_or_else(|| {
+        if bind_host == "0.0.0.0" {
+            // If the executor is being bound to "0.0.0.0" (which means use 
all ips in all eth devices)
+            // then use "localhost" to connect to itself through the 
BallistaClient
+            "localhost"
+        } else {
+            &bind_host
+        }
+    });
+    let client = BallistaClient::try_new(client_host, port).await?;
     tokio::spawn(execution_loop::poll_loop(
         scheduler,
         client,
diff --git a/ballista/rust/scheduler/src/lib.rs 
b/ballista/rust/scheduler/src/lib.rs
index a675153..3dc8df2 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -24,16 +24,16 @@ pub mod state;
 #[cfg(test)]
 pub mod test_utils;
 
-use std::fmt;
 use std::{convert::TryInto, sync::Arc};
+use std::{fmt, net::IpAddr};
 
 use ballista_core::serde::protobuf::{
-    execute_query_params::Query, job_status, 
scheduler_grpc_server::SchedulerGrpc,
-    ExecuteQueryParams, ExecuteQueryResult, FailedJob, FilePartitionMetadata, 
FileType,
-    GetExecutorMetadataParams, GetExecutorMetadataResult, 
GetFileMetadataParams,
-    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
-    PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, 
TaskDefinition,
-    TaskStatus,
+    execute_query_params::Query, executor_registration::OptionalHost, 
job_status,
+    scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, 
ExecuteQueryResult,
+    FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams,
+    GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult,
+    GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId, 
PollWorkParams,
+    PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus,
 };
 use ballista_core::serde::scheduler::ExecutorMeta;
 
@@ -71,13 +71,18 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
 
 #[derive(Clone)]
 pub struct SchedulerServer {
+    caller_ip: IpAddr,
     state: Arc<SchedulerState>,
     start_time: u128,
     version: String,
 }
 
 impl SchedulerServer {
-    pub fn new(config: Arc<dyn ConfigBackendClient>, namespace: String) -> 
Self {
+    pub fn new(
+        config: Arc<dyn ConfigBackendClient>,
+        namespace: String,
+        caller_ip: IpAddr,
+    ) -> Self {
         const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
         let state = Arc::new(SchedulerState::new(config, namespace));
         let state_clone = state.clone();
@@ -86,6 +91,7 @@ impl SchedulerServer {
         tokio::spawn(async move { 
state_clone.synchronize_job_status_loop().await });
 
         Self {
+            caller_ip,
             state,
             start_time: SystemTime::now()
                 .duration_since(UNIX_EPOCH)
@@ -131,7 +137,16 @@ impl SchedulerGrpc for SchedulerServer {
         } = request.into_inner()
         {
             debug!("Received poll_work request for {:?}", metadata);
-            let metadata: ExecutorMeta = metadata.into();
+            let metadata: ExecutorMeta = ExecutorMeta {
+                id: metadata.id,
+                host: metadata
+                    .optional_host
+                    .map(|h| match h {
+                        OptionalHost::Host(host) => host,
+                    })
+                    .unwrap_or_else(|| self.caller_ip.to_string()),
+                port: metadata.port as u16,
+            };
             let mut lock = self.state.lock().await.map_err(|e| {
                 let msg = format!("Could not lock the state: {}", e);
                 error!("{}", msg);
@@ -359,12 +374,7 @@ impl SchedulerGrpc for SchedulerServer {
                         job_id_spawn, e
                     );
                 }
-                let mut planner = 
fail_job!(DistributedPlanner::try_new(executors)
-                    .map_err(|e| {
-                        let msg = format!("Could not create distributed 
planner: {}", e);
-                        error!("{}", msg);
-                        tonic::Status::internal(msg)
-                    }));
+                let mut planner = DistributedPlanner::new();
                 let stages = fail_job!(planner
                     .plan_query_stages(&job_id_spawn, plan)
                     .map_err(|e| {
@@ -433,12 +443,17 @@ impl SchedulerGrpc for SchedulerServer {
 
 #[cfg(all(test, feature = "sled"))]
 mod test {
-    use std::sync::Arc;
+    use std::{
+        net::{IpAddr, Ipv4Addr},
+        sync::Arc,
+    };
 
     use tonic::Request;
 
     use ballista_core::error::BallistaError;
-    use ballista_core::serde::protobuf::{ExecutorMetadata, PollWorkParams};
+    use ballista_core::serde::protobuf::{
+        executor_registration::OptionalHost, ExecutorRegistration, 
PollWorkParams,
+    };
 
     use super::{
         state::{SchedulerState, StandaloneClient},
@@ -449,11 +464,15 @@ mod test {
     async fn test_poll_work() -> Result<(), BallistaError> {
         let state = Arc::new(StandaloneClient::try_new_temporary()?);
         let namespace = "default";
-        let scheduler = SchedulerServer::new(state.clone(), 
namespace.to_owned());
+        let scheduler = SchedulerServer::new(
+            state.clone(),
+            namespace.to_owned(),
+            IpAddr::V4(Ipv4Addr::LOCALHOST),
+        );
         let state = SchedulerState::new(state, namespace.to_string());
-        let exec_meta = ExecutorMetadata {
+        let exec_meta = ExecutorRegistration {
             id: "abc".to_owned(),
-            host: "".to_owned(),
+            optional_host: Some(OptionalHost::Host("".to_owned())),
             port: 0,
         };
         let request: Request<PollWorkParams> = Request::new(PollWorkParams {
diff --git a/ballista/rust/scheduler/src/main.rs 
b/ballista/rust/scheduler/src/main.rs
index 205023a..713103f 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -19,7 +19,7 @@
 
 use anyhow::{Context, Result};
 use futures::future::{self, Either, TryFutureExt};
-use hyper::{service::make_service_fn, Server};
+use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
 use std::convert::Infallible;
 use std::{net::SocketAddr, sync::Arc};
 use tonic::transport::Server as TonicServer;
@@ -62,17 +62,20 @@ async fn start_server(
         BALLISTA_VERSION, addr
     );
 
-    let scheduler_server =
-        SchedulerServer::new(config_backend.clone(), namespace.clone());
     Ok(Server::bind(&addr)
-        .serve(make_service_fn(move |_| {
+        .serve(make_service_fn(move |request: &AddrStream| {
+            let scheduler_server = SchedulerServer::new(
+                config_backend.clone(),
+                namespace.clone(),
+                request.remote_addr().ip(),
+            );
             let scheduler_grpc_server =
                 SchedulerGrpcServer::new(scheduler_server.clone());
 
             let mut tonic = TonicServer::builder()
                 .add_service(scheduler_grpc_server)
                 .into_service();
-            let mut warp = warp::service(get_routes(scheduler_server.clone()));
+            let mut warp = warp::service(get_routes(scheduler_server));
 
             future::ok::<_, Infallible>(tower::service_fn(
                 move |req: hyper::Request<hyper::Body>| {
diff --git a/ballista/rust/scheduler/src/planner.rs 
b/ballista/rust/scheduler/src/planner.rs
index e791fa8..20dd0d3 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -19,17 +19,11 @@
 //!
 //! This code is EXPERIMENTAL and still under development
 
-use std::pin::Pin;
+use std::collections::HashMap;
 use std::sync::Arc;
-use std::time::Instant;
-use std::{collections::HashMap, future::Future};
 
-use ballista_core::client::BallistaClient;
 use ballista_core::datasource::DfTableAdapter;
 use ballista_core::error::{BallistaError, Result};
-use ballista_core::serde::scheduler::ExecutorMeta;
-use ballista_core::serde::scheduler::PartitionId;
-use ballista_core::utils::format_plan;
 use ballista_core::{
     execution_plans::{QueryStageExec, ShuffleReaderExec, 
UnresolvedShuffleExec},
     serde::scheduler::PartitionLocation,
@@ -42,59 +36,27 @@ use 
datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec
 use datafusion::physical_plan::hash_join::HashJoinExec;
 use datafusion::physical_plan::merge::MergeExec;
 use datafusion::physical_plan::ExecutionPlan;
-use log::{debug, info};
-use tokio::task::JoinHandle;
+use log::info;
 
-type SendableExecutionPlan =
-    Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>>> + Send>>;
 type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, 
Vec<Arc<QueryStageExec>>);
 
 pub struct DistributedPlanner {
-    executors: Vec<ExecutorMeta>,
     next_stage_id: usize,
 }
 
 impl DistributedPlanner {
-    pub fn try_new(executors: Vec<ExecutorMeta>) -> Result<Self> {
-        if executors.is_empty() {
-            Err(BallistaError::General(
-                "DistributedPlanner requires at least one executor".to_owned(),
-            ))
-        } else {
-            Ok(Self {
-                executors,
-                next_stage_id: 0,
-            })
-        }
+    pub fn new() -> Self {
+        Self { next_stage_id: 0 }
     }
 }
 
-impl DistributedPlanner {
-    /// Execute a distributed query against a cluster, leaving the final 
results on the
-    /// executors. The [ExecutionPlan] returned by this method is guaranteed 
to be a
-    /// [ShuffleReaderExec] that can be used to fetch the final results from 
the executors
-    /// in parallel.
-    pub async fn execute_distributed_query(
-        &mut self,
-        job_id: String,
-        execution_plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let now = Instant::now();
-        let execution_plans = self.plan_query_stages(&job_id, execution_plan)?;
-
-        info!(
-            "DistributedPlanner created {} execution plans in {} seconds:",
-            execution_plans.len(),
-            now.elapsed().as_secs()
-        );
-
-        for plan in &execution_plans {
-            info!("{}", format_plan(plan.as_ref(), 0)?);
-        }
-
-        execute(execution_plans, self.executors.clone()).await
+impl Default for DistributedPlanner {
+    fn default() -> Self {
+        Self::new()
     }
+}
 
+impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a 
[QueryStageExec].
     /// Plans that depend on the input of other plans will have leaf nodes of 
type [UnresolvedShuffleExec].
     /// A [QueryStageExec] is created whenever the partitioning changes.
@@ -221,38 +183,6 @@ impl DistributedPlanner {
     }
 }
 
-fn execute(
-    stages: Vec<Arc<QueryStageExec>>,
-    executors: Vec<ExecutorMeta>,
-) -> SendableExecutionPlan {
-    Box::pin(async move {
-        let mut partition_locations: HashMap<usize, Vec<PartitionLocation>> =
-            HashMap::new();
-        let mut result_partition_locations = vec![];
-        for stage in &stages {
-            debug!("execute() {}", &format!("{:?}", stage)[0..60]);
-            let stage = remove_unresolved_shuffles(stage.as_ref(), 
&partition_locations)?;
-            let stage = 
stage.as_any().downcast_ref::<QueryStageExec>().unwrap();
-            result_partition_locations = execute_query_stage(
-                &stage.job_id.clone(),
-                stage.stage_id,
-                stage.children()[0].clone(),
-                executors.clone(),
-            )
-            .await?;
-            partition_locations
-                .insert(stage.stage_id, result_partition_locations.clone());
-        }
-
-        let shuffle_reader: Arc<dyn ExecutionPlan> =
-            Arc::new(ShuffleReaderExec::try_new(
-                result_partition_locations,
-                stages.last().unwrap().schema(),
-            )?);
-        Ok(shuffle_reader)
-    })
-}
-
 pub fn remove_unresolved_shuffles(
     stage: &dyn ExecutionPlan,
     partition_locations: &HashMap<usize, Vec<PartitionLocation>>,
@@ -298,88 +228,6 @@ fn create_query_stage(
     Ok(Arc::new(QueryStageExec::try_new(job_id, stage_id, plan)?))
 }
 
-/// Execute a query stage by sending each partition to an executor
-async fn execute_query_stage(
-    job_id: &str,
-    stage_id: usize,
-    plan: Arc<dyn ExecutionPlan>,
-    executors: Vec<ExecutorMeta>,
-) -> Result<Vec<PartitionLocation>> {
-    info!(
-        "execute_query_stage() stage_id={}\n{}",
-        stage_id,
-        format_plan(plan.as_ref(), 0)?
-    );
-
-    let partition_count = plan.output_partitioning().partition_count();
-
-    let num_chunks = partition_count / executors.len();
-    let num_chunks = num_chunks.max(1);
-    let partition_chunks: Vec<Vec<usize>> = (0..partition_count)
-        .collect::<Vec<usize>>()
-        .chunks(num_chunks)
-        .map(|r| r.to_vec())
-        .collect();
-
-    info!(
-        "Executing query stage with {} chunks of partition ranges",
-        partition_chunks.len()
-    );
-
-    let mut executions: Vec<JoinHandle<Result<Vec<PartitionLocation>>>> =
-        Vec::with_capacity(partition_count);
-    for i in 0..partition_chunks.len() {
-        let plan = plan.clone();
-        let executor_meta = executors[i % executors.len()].clone();
-        let partition_ids = partition_chunks[i].to_vec();
-        let job_id = job_id.to_owned();
-        executions.push(tokio::spawn(async move {
-            let mut client =
-                BallistaClient::try_new(&executor_meta.host, 
executor_meta.port).await?;
-            let stats = client
-                .execute_partition(job_id.clone(), stage_id, 
partition_ids.clone(), plan)
-                .await?;
-
-            Ok(partition_ids
-                .iter()
-                .map(|part| PartitionLocation {
-                    partition_id: PartitionId::new(&job_id, stage_id, *part),
-                    executor_meta: executor_meta.clone(),
-                    partition_stats: *stats[*part].statistics(),
-                })
-                .collect())
-        }));
-    }
-
-    // wait for all partitions to complete
-    let results = futures::future::join_all(executions).await;
-
-    // check for errors
-    let mut meta = Vec::with_capacity(partition_count);
-    for result in results {
-        match result {
-            Ok(partition_result) => {
-                let final_result = partition_result?;
-                debug!("Query stage partition result: {:?}", final_result);
-                meta.extend(final_result);
-            }
-            Err(e) => {
-                return Err(BallistaError::General(format!(
-                    "Query stage {} failed: {:?}",
-                    stage_id, e
-                )))
-            }
-        }
-    }
-
-    debug!(
-        "execute_query_stage() stage_id={} produced {:?}",
-        stage_id, meta
-    );
-
-    Ok(meta)
-}
-
 #[cfg(test)]
 mod test {
     use crate::planner::DistributedPlanner;
@@ -387,7 +235,6 @@ mod test {
     use ballista_core::error::BallistaError;
     use ballista_core::execution_plans::UnresolvedShuffleExec;
     use ballista_core::serde::protobuf;
-    use ballista_core::serde::scheduler::ExecutorMeta;
     use ballista_core::utils::format_plan;
     use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
     use datafusion::physical_plan::merge::MergeExec;
@@ -420,11 +267,7 @@ mod test {
         let plan = ctx.optimize(&plan)?;
         let plan = ctx.create_physical_plan(&plan)?;
 
-        let mut planner = DistributedPlanner::try_new(vec![ExecutorMeta {
-            id: "".to_string(),
-            host: "".to_string(),
-            port: 0,
-        }])?;
+        let mut planner = DistributedPlanner::new();
         let job_uuid = Uuid::new_v4();
         let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
         for stage in &stages {
diff --git a/benchmarks/docker-compose.yaml b/benchmarks/docker-compose.yaml
index 6015dba..bbb3107 100644
--- a/benchmarks/docker-compose.yaml
+++ b/benchmarks/docker-compose.yaml
@@ -14,7 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-version: '2.0'
+version: '2.2'
 services:
   etcd:
     image: quay.io/coreos/etcd:v3.4.9
@@ -28,18 +28,10 @@ services:
       - ./data:/data
     depends_on:
       - etcd
-  ballista-executor-1:
+  ballista-executor:
     image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
-    command: "/executor --bind-host 0.0.0.0 --port 50051 --external-host 
ballista-executor-1 --scheduler-host ballista-scheduler"
-    environment:
-      - RUST_LOG=info
-    volumes:
-      - ./data:/data
-    depends_on:
-      - ballista-scheduler
-  ballista-executor-2:
-    image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
-    command: "/executor --bind-host 0.0.0.0 --port 50052 --external-host 
ballista-executor-2 --scheduler-host ballista-scheduler"
+    command: "/executor --bind-host 0.0.0.0 --port 50051 --scheduler-host 
ballista-scheduler"
+    scale: 2
     environment:
       - RUST_LOG=info
     volumes:
@@ -57,6 +49,5 @@ services:
       - ../..:/ballista
     depends_on:
       - ballista-scheduler
-      - ballista-executor-1
-      - ballista-executor-2
+      - ballista-executor
 

Reply via email to