This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 371a6970 Remove output_partitioning for task definition (#776)
371a6970 is described below
commit 371a6970487d9db7c5038a11d9a4cdf799668607
Author: yahoNanJing <[email protected]>
AuthorDate: Thu May 18 10:05:28 2023 +0800
Remove output_partitioning for task definition (#776)
Co-authored-by: yangzhong <[email protected]>
---
ballista/core/proto/ballista.proto | 4 ----
ballista/core/src/serde/generated/ballista.rs | 10 ----------
ballista/core/src/serde/scheduler/from_proto.rs | 3 ---
ballista/core/src/serde/scheduler/mod.rs | 2 --
ballista/core/src/serde/scheduler/to_proto.rs | 1 -
ballista/executor/src/execution_loop.rs | 8 --------
ballista/executor/src/executor.rs | 10 +---------
ballista/executor/src/executor_server.rs | 12 +-----------
ballista/scheduler/src/state/task_manager.rs | 8 --------
ballista/scheduler/src/test_utils.rs | 8 +-------
10 files changed, 3 insertions(+), 63 deletions(-)
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index 722baaa9..dea64227 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -445,8 +445,6 @@ message TaskDefinition {
uint32 stage_attempt_num = 5;
uint32 partition_id = 6;
bytes plan = 7;
- // Output partition for shuffle writer
- datafusion.PhysicalHashRepartition output_partitioning = 8;
string session_id = 9;
uint64 launch_time = 10;
repeated KeyValuePair props = 11;
@@ -459,8 +457,6 @@ message MultiTaskDefinition {
uint32 stage_id = 3;
uint32 stage_attempt_num = 4;
bytes plan = 5;
- // Output partition for shuffle writer
- datafusion.PhysicalHashRepartition output_partitioning = 6;
string session_id = 7;
uint64 launch_time = 8;
repeated KeyValuePair props = 9;
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index 439bbf84..9000fc7e 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -764,11 +764,6 @@ pub struct TaskDefinition {
pub partition_id: u32,
#[prost(bytes = "vec", tag = "7")]
pub plan: ::prost::alloc::vec::Vec<u8>,
- /// Output partition for shuffle writer
- #[prost(message, optional, tag = "8")]
- pub output_partitioning: ::core::option::Option<
- ::datafusion_proto::protobuf::PhysicalHashRepartition,
- >,
#[prost(string, tag = "9")]
pub session_id: ::prost::alloc::string::String,
#[prost(uint64, tag = "10")]
@@ -790,11 +785,6 @@ pub struct MultiTaskDefinition {
pub stage_attempt_num: u32,
#[prost(bytes = "vec", tag = "5")]
pub plan: ::prost::alloc::vec::Vec<u8>,
- /// Output partition for shuffle writer
- #[prost(message, optional, tag = "6")]
- pub output_partitioning: ::core::option::Option<
- ::datafusion_proto::protobuf::PhysicalHashRepartition,
- >,
#[prost(string, tag = "7")]
pub session_id: ::prost::alloc::string::String,
#[prost(uint64, tag = "8")]
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs
b/ballista/core/src/serde/scheduler/from_proto.rs
index ec39ba3a..17875e2b 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -287,7 +287,6 @@ impl TryInto<(TaskDefinition, Vec<u8>)> for
protobuf::TaskDefinition {
stage_attempt_num: self.stage_attempt_num as usize,
partition_id: self.partition_id as usize,
plan: vec![],
- output_partitioning: self.output_partitioning,
session_id: self.session_id,
launch_time: self.launch_time,
props,
@@ -307,7 +306,6 @@ impl TryInto<(Vec<TaskDefinition>, Vec<u8>)> for
protobuf::MultiTaskDefinition {
}
let plan = self.plan;
- let output_partitioning = self.output_partitioning;
let session_id = self.session_id;
let job_id = self.job_id;
let stage_id = self.stage_id as usize;
@@ -326,7 +324,6 @@ impl TryInto<(Vec<TaskDefinition>, Vec<u8>)> for
protobuf::MultiTaskDefinition {
stage_attempt_num,
partition_id: task_id.partition_id as usize,
plan: vec![],
- output_partitioning: output_partitioning.clone(),
session_id: session_id.clone(),
launch_time,
props: props.clone(),
diff --git a/ballista/core/src/serde/scheduler/mod.rs
b/ballista/core/src/serde/scheduler/mod.rs
index 83a6d173..6e9440a3 100644
--- a/ballista/core/src/serde/scheduler/mod.rs
+++ b/ballista/core/src/serde/scheduler/mod.rs
@@ -23,7 +23,6 @@ use datafusion::arrow::array::{
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
-use datafusion_proto::protobuf as datafusion_protobuf;
use serde::Serialize;
use crate::error::BallistaError;
@@ -281,7 +280,6 @@ pub struct TaskDefinition {
pub stage_attempt_num: usize,
pub partition_id: usize,
pub plan: Vec<u8>,
- pub output_partitioning:
Option<datafusion_protobuf::PhysicalHashRepartition>,
pub session_id: String,
pub launch_time: u64,
pub props: HashMap<String, String>,
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs
b/ballista/core/src/serde/scheduler/to_proto.rs
index c94be6a2..ccb5ec42 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -263,7 +263,6 @@ impl Into<protobuf::TaskDefinition> for TaskDefinition {
stage_attempt_num: self.stage_attempt_num as u32,
partition_id: self.partition_id as u32,
plan: self.plan,
- output_partitioning: self.output_partitioning,
session_id: self.session_id,
launch_time: self.launch_time,
props,
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index aaaa90a3..47593ebd 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -33,7 +33,6 @@ use ballista_core::serde::scheduler::{ExecutorSpecification,
PartitionId};
use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::TaskContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
-use
datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion_proto::physical_plan::AsExecutionPlan;
use futures::FutureExt;
use log::{debug, error, info, warn};
@@ -209,12 +208,6 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
)
})?;
- let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
- task.output_partitioning.as_ref(),
- task_context.as_ref(),
- plan.schema().as_ref(),
- )?;
-
let query_stage_exec = executor.execution_engine.create_query_stage_exec(
job_id.clone(),
stage_id as usize,
@@ -234,7 +227,6 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
part.clone(),
query_stage_exec.clone(),
task_context,
- shuffle_output_partitioning,
))
.catch_unwind()
.await
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 4e60f6ab..90d960d0 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -30,7 +30,6 @@ use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
-use datafusion::physical_plan::Partitioning;
use futures::future::AbortHandle;
use std::collections::HashMap;
use std::future::Future;
@@ -122,7 +121,6 @@ impl Executor {
partition: PartitionId,
query_stage_exec: Arc<dyn QueryStageExecutor>,
task_ctx: Arc<TaskContext>,
- _shuffle_output_partitioning: Option<Partitioning>,
) -> Result<Vec<protobuf::ShuffleWritePartition>, BallistaError> {
let (task, abort_handle) = futures::future::abortable(
query_stage_exec.execute_query_stage(partition.partition_id,
task_ctx),
@@ -318,13 +316,7 @@ mod test {
partition_id: 0,
};
let task_result = executor_clone
- .execute_query_stage(
- 1,
- part,
- Arc::new(query_stage_exec),
- ctx.task_ctx(),
- None,
- )
+ .execute_query_stage(1, part, Arc::new(query_stage_exec),
ctx.task_ctx())
.await;
sender.send(task_result).expect("sending result");
});
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index ff8e24eb..25ffbe26 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -47,10 +47,7 @@ use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
-use datafusion_proto::{
- logical_plan::AsLogicalPlan,
- physical_plan::{from_proto::parse_protobuf_hash_partitioning,
AsExecutionPlan},
-};
+use datafusion_proto::{logical_plan::AsLogicalPlan,
physical_plan::AsExecutionPlan};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::task::JoinHandle;
@@ -386,12 +383,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
runtime.clone(),
));
- let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
- task.output_partitioning.as_ref(),
- task_context.as_ref(),
- query_stage_exec.schema().as_ref(),
- )?;
-
let task_id = task.task_id;
let job_id = task.job_id;
let stage_id = task.stage_id;
@@ -413,7 +404,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
part.clone(),
query_stage_exec.clone(),
task_context,
- shuffle_output_partitioning,
)
.await;
info!("Done with task {}", task_identity);
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index 0d3293f7..6bf245ca 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -29,7 +29,6 @@ use crate::cluster::JobState;
use ballista_core::serde::protobuf::{
self, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, TaskStatus,
};
-use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use ballista_core::serde::scheduler::ExecutorMetadata;
use ballista_core::serde::BallistaCodec;
use dashmap::DashMap;
@@ -534,9 +533,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
plan_buf
};
- let output_partitioning =
- hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
-
let task_definition = TaskDefinition {
task_id: task.task_id as u32,
task_attempt_num: task.task_attempt as u32,
@@ -545,7 +541,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
stage_attempt_num: task.stage_attempt_num as u32,
partition_id: task.partition.partition_id as u32,
plan,
- output_partitioning,
session_id: task.session_id,
launch_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -617,8 +612,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
plan_buf
};
- let output_partitioning =
-
hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
let task_ids = tasks
.iter()
@@ -635,7 +628,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
stage_id: stage_id as u32,
stage_attempt_num: stage_attempt_num as u32,
plan,
- output_partitioning,
session_id,
launch_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index aceef06c..217c96c0 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -270,13 +270,7 @@ pub fn default_task_runner() -> impl TaskRunner {
TaskRunnerFn::new(|executor_id: String, task: MultiTaskDefinition| {
let mut statuses = vec![];
- let partitions =
- if let Some(output_partitioning) =
task.output_partitioning.as_ref() {
- output_partitioning.partition_count as usize
- } else {
- 1
- };
-
+ let partitions = 1;
let partitions: Vec<ShuffleWritePartition> = (0..partitions)
.map(|i| ShuffleWritePartition {
partition_id: i as u64,