This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 371a6970 Remove output_partitioning for task definition (#776)
371a6970 is described below

commit 371a6970487d9db7c5038a11d9a4cdf799668607
Author: yahoNanJing <[email protected]>
AuthorDate: Thu May 18 10:05:28 2023 +0800

    Remove output_partitioning for task definition (#776)
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/core/proto/ballista.proto              |  4 ----
 ballista/core/src/serde/generated/ballista.rs   | 10 ----------
 ballista/core/src/serde/scheduler/from_proto.rs |  3 ---
 ballista/core/src/serde/scheduler/mod.rs        |  2 --
 ballista/core/src/serde/scheduler/to_proto.rs   |  1 -
 ballista/executor/src/execution_loop.rs         |  8 --------
 ballista/executor/src/executor.rs               | 10 +---------
 ballista/executor/src/executor_server.rs        | 12 +-----------
 ballista/scheduler/src/state/task_manager.rs    |  8 --------
 ballista/scheduler/src/test_utils.rs            |  8 +-------
 10 files changed, 3 insertions(+), 63 deletions(-)

diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index 722baaa9..dea64227 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -445,8 +445,6 @@ message TaskDefinition {
   uint32 stage_attempt_num = 5;
   uint32 partition_id = 6;
   bytes plan = 7;
-  // Output partition for shuffle writer
-  datafusion.PhysicalHashRepartition output_partitioning = 8;
   string session_id = 9;
   uint64 launch_time = 10;
   repeated KeyValuePair props = 11;
@@ -459,8 +457,6 @@ message MultiTaskDefinition {
   uint32 stage_id = 3;
   uint32 stage_attempt_num = 4;
   bytes plan = 5;
-  // Output partition for shuffle writer
-  datafusion.PhysicalHashRepartition output_partitioning = 6;
   string session_id = 7;
   uint64 launch_time = 8;
   repeated KeyValuePair props = 9;
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index 439bbf84..9000fc7e 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -764,11 +764,6 @@ pub struct TaskDefinition {
     pub partition_id: u32,
     #[prost(bytes = "vec", tag = "7")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
-    /// Output partition for shuffle writer
-    #[prost(message, optional, tag = "8")]
-    pub output_partitioning: ::core::option::Option<
-        ::datafusion_proto::protobuf::PhysicalHashRepartition,
-    >,
     #[prost(string, tag = "9")]
     pub session_id: ::prost::alloc::string::String,
     #[prost(uint64, tag = "10")]
@@ -790,11 +785,6 @@ pub struct MultiTaskDefinition {
     pub stage_attempt_num: u32,
     #[prost(bytes = "vec", tag = "5")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
-    /// Output partition for shuffle writer
-    #[prost(message, optional, tag = "6")]
-    pub output_partitioning: ::core::option::Option<
-        ::datafusion_proto::protobuf::PhysicalHashRepartition,
-    >,
     #[prost(string, tag = "7")]
     pub session_id: ::prost::alloc::string::String,
     #[prost(uint64, tag = "8")]
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs 
b/ballista/core/src/serde/scheduler/from_proto.rs
index ec39ba3a..17875e2b 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -287,7 +287,6 @@ impl TryInto<(TaskDefinition, Vec<u8>)> for 
protobuf::TaskDefinition {
                 stage_attempt_num: self.stage_attempt_num as usize,
                 partition_id: self.partition_id as usize,
                 plan: vec![],
-                output_partitioning: self.output_partitioning,
                 session_id: self.session_id,
                 launch_time: self.launch_time,
                 props,
@@ -307,7 +306,6 @@ impl TryInto<(Vec<TaskDefinition>, Vec<u8>)> for 
protobuf::MultiTaskDefinition {
         }
 
         let plan = self.plan;
-        let output_partitioning = self.output_partitioning;
         let session_id = self.session_id;
         let job_id = self.job_id;
         let stage_id = self.stage_id as usize;
@@ -326,7 +324,6 @@ impl TryInto<(Vec<TaskDefinition>, Vec<u8>)> for 
protobuf::MultiTaskDefinition {
                     stage_attempt_num,
                     partition_id: task_id.partition_id as usize,
                     plan: vec![],
-                    output_partitioning: output_partitioning.clone(),
                     session_id: session_id.clone(),
                     launch_time,
                     props: props.clone(),
diff --git a/ballista/core/src/serde/scheduler/mod.rs 
b/ballista/core/src/serde/scheduler/mod.rs
index 83a6d173..6e9440a3 100644
--- a/ballista/core/src/serde/scheduler/mod.rs
+++ b/ballista/core/src/serde/scheduler/mod.rs
@@ -23,7 +23,6 @@ use datafusion::arrow::array::{
 use datafusion::arrow::datatypes::{DataType, Field};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::Partitioning;
-use datafusion_proto::protobuf as datafusion_protobuf;
 use serde::Serialize;
 
 use crate::error::BallistaError;
@@ -281,7 +280,6 @@ pub struct TaskDefinition {
     pub stage_attempt_num: usize,
     pub partition_id: usize,
     pub plan: Vec<u8>,
-    pub output_partitioning: 
Option<datafusion_protobuf::PhysicalHashRepartition>,
     pub session_id: String,
     pub launch_time: u64,
     pub props: HashMap<String, String>,
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs 
b/ballista/core/src/serde/scheduler/to_proto.rs
index c94be6a2..ccb5ec42 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -263,7 +263,6 @@ impl Into<protobuf::TaskDefinition> for TaskDefinition {
             stage_attempt_num: self.stage_attempt_num as u32,
             partition_id: self.partition_id as u32,
             plan: self.plan,
-            output_partitioning: self.output_partitioning,
             session_id: self.session_id,
             launch_time: self.launch_time,
             props,
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index aaaa90a3..47593ebd 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -33,7 +33,6 @@ use ballista_core::serde::scheduler::{ExecutorSpecification, 
PartitionId};
 use ballista_core::serde::BallistaCodec;
 use datafusion::execution::context::TaskContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
-use 
datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
 use datafusion_proto::physical_plan::AsExecutionPlan;
 use futures::FutureExt;
 use log::{debug, error, info, warn};
@@ -209,12 +208,6 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
             )
         })?;
 
-    let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
-        task.output_partitioning.as_ref(),
-        task_context.as_ref(),
-        plan.schema().as_ref(),
-    )?;
-
     let query_stage_exec = executor.execution_engine.create_query_stage_exec(
         job_id.clone(),
         stage_id as usize,
@@ -234,7 +227,6 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
             part.clone(),
             query_stage_exec.clone(),
             task_context,
-            shuffle_output_partitioning,
         ))
         .catch_unwind()
         .await
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 4e60f6ab..90d960d0 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -30,7 +30,6 @@ use datafusion::execution::context::TaskContext;
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::physical_plan::udaf::AggregateUDF;
 use datafusion::physical_plan::udf::ScalarUDF;
-use datafusion::physical_plan::Partitioning;
 use futures::future::AbortHandle;
 use std::collections::HashMap;
 use std::future::Future;
@@ -122,7 +121,6 @@ impl Executor {
         partition: PartitionId,
         query_stage_exec: Arc<dyn QueryStageExecutor>,
         task_ctx: Arc<TaskContext>,
-        _shuffle_output_partitioning: Option<Partitioning>,
     ) -> Result<Vec<protobuf::ShuffleWritePartition>, BallistaError> {
         let (task, abort_handle) = futures::future::abortable(
             query_stage_exec.execute_query_stage(partition.partition_id, 
task_ctx),
@@ -318,13 +316,7 @@ mod test {
                 partition_id: 0,
             };
             let task_result = executor_clone
-                .execute_query_stage(
-                    1,
-                    part,
-                    Arc::new(query_stage_exec),
-                    ctx.task_ctx(),
-                    None,
-                )
+                .execute_query_stage(1, part, Arc::new(query_stage_exec), 
ctx.task_ctx())
                 .await;
             sender.send(task_result).expect("sending result");
         });
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index ff8e24eb..25ffbe26 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -47,10 +47,7 @@ use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
 use dashmap::DashMap;
 use datafusion::execution::context::TaskContext;
-use datafusion_proto::{
-    logical_plan::AsLogicalPlan,
-    physical_plan::{from_proto::parse_protobuf_hash_partitioning, 
AsExecutionPlan},
-};
+use datafusion_proto::{logical_plan::AsLogicalPlan, 
physical_plan::AsExecutionPlan};
 use tokio::sync::mpsc::error::TryRecvError;
 use tokio::task::JoinHandle;
 
@@ -386,12 +383,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
             runtime.clone(),
         ));
 
-        let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
-            task.output_partitioning.as_ref(),
-            task_context.as_ref(),
-            query_stage_exec.schema().as_ref(),
-        )?;
-
         let task_id = task.task_id;
         let job_id = task.job_id;
         let stage_id = task.stage_id;
@@ -413,7 +404,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
                 part.clone(),
                 query_stage_exec.clone(),
                 task_context,
-                shuffle_output_partitioning,
             )
             .await;
         info!("Done with task {}", task_identity);
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index 0d3293f7..6bf245ca 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -29,7 +29,6 @@ use crate::cluster::JobState;
 use ballista_core::serde::protobuf::{
     self, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus,
 };
-use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
 use ballista_core::serde::scheduler::ExecutorMetadata;
 use ballista_core::serde::BallistaCodec;
 use dashmap::DashMap;
@@ -534,9 +533,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 plan_buf
             };
 
-            let output_partitioning =
-                hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
-
             let task_definition = TaskDefinition {
                 task_id: task.task_id as u32,
                 task_attempt_num: task.task_attempt as u32,
@@ -545,7 +541,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 stage_attempt_num: task.stage_attempt_num as u32,
                 partition_id: task.partition.partition_id as u32,
                 plan,
-                output_partitioning,
                 session_id: task.session_id,
                 launch_time: SystemTime::now()
                     .duration_since(UNIX_EPOCH)
@@ -617,8 +612,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
 
                     plan_buf
                 };
-                let output_partitioning =
-                    
hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
 
                 let task_ids = tasks
                     .iter()
@@ -635,7 +628,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                     stage_id: stage_id as u32,
                     stage_attempt_num: stage_attempt_num as u32,
                     plan,
-                    output_partitioning,
                     session_id,
                     launch_time: SystemTime::now()
                         .duration_since(UNIX_EPOCH)
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index aceef06c..217c96c0 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -270,13 +270,7 @@ pub fn default_task_runner() -> impl TaskRunner {
     TaskRunnerFn::new(|executor_id: String, task: MultiTaskDefinition| {
         let mut statuses = vec![];
 
-        let partitions =
-            if let Some(output_partitioning) = 
task.output_partitioning.as_ref() {
-                output_partitioning.partition_count as usize
-            } else {
-                1
-            };
-
+        let partitions = 1;
         let partitions: Vec<ShuffleWritePartition> = (0..partitions)
             .map(|i| ShuffleWritePartition {
                 partition_id: i as u64,

Reply via email to