This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new a7c527f3 Update datafusion requirement from 14.0.0 to 15.0.0 (#552)
a7c527f3 is described below

commit a7c527f36a3f70a3a4a53de631535b67e708b53c
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Dec 9 10:13:14 2022 +0800

    Update datafusion requirement from 14.0.0 to 15.0.0 (#552)
    
    * Update datafusion requirement from 14.0.0 to 15.0.0
    
    * Fix UT
    
    * Fix python
    
    * Fix python
    
    * Fix Python
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista-cli/Cargo.toml                            |   4 +-
 ballista/client/Cargo.toml                         |   6 +-
 ballista/client/src/context.rs                     |  19 +-
 ballista/core/Cargo.toml                           |   8 +-
 ballista/core/proto/ballista.proto                 |   1 +
 ballista/core/proto/datafusion.proto               |  25 +-
 ballista/core/src/serde/generated/ballista.rs      | 991 +++++++++++----------
 ballista/core/src/serde/mod.rs                     |   4 +-
 .../core/src/serde/physical_plan/from_proto.rs     |   2 +
 ballista/core/src/serde/physical_plan/mod.rs       |   8 +-
 ballista/core/src/serde/physical_plan/to_proto.rs  |   6 +-
 ballista/executor/Cargo.toml                       |   8 +-
 ballista/scheduler/Cargo.toml                      |   6 +-
 ballista/scheduler/src/display.rs                  |   2 +-
 ballista/scheduler/src/scheduler_server/grpc.rs    |   6 +-
 ballista/scheduler/src/state/execution_graph.rs    |  10 +-
 .../src/state/execution_graph/execution_stage.rs   |   6 +-
 .../scheduler/src/state/execution_graph_dot.rs     | 129 ++-
 ballista/scheduler/src/test_utils.rs               |   2 +-
 benchmarks/Cargo.toml                              |   4 +-
 benchmarks/src/bin/tpch.rs                         |   4 +-
 examples/Cargo.toml                                |   2 +-
 python/Cargo.toml                                  |   2 +-
 python/src/context.rs                              |  16 +-
 python/src/dataset.rs                              |   4 +-
 python/src/datatype.rs                             |  39 +
 python/src/lib.rs                                  |   2 +
 python/src/udaf.rs                                 |   4 +
 28 files changed, 781 insertions(+), 539 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index ea3f05e4..3f1c500f 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0", 
features = [
     "standalone",
 ] }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "14.0.0"
-datafusion-cli = "14.0.0"
+datafusion = "15.0.0"
+datafusion-cli = "15.0.0"
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index a429ca2d..cc870c87 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.63"
 ballista-core = { path = "../core", version = "0.10.0" }
 ballista-executor = { path = "../executor", version = "0.10.0", optional = 
true }
 ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional = 
true }
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
-sqlparser = "0.26"
+sqlparser = "0.27"
 tempfile = "3"
 tokio = "1.0"
 
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 9c1b9e67..51a6c3bc 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -17,6 +17,7 @@
 
 //! Distributed execution context.
 
+use datafusion::arrow::datatypes::SchemaRef;
 use log::info;
 use parking_lot::Mutex;
 use sqlparser::ast::Statement;
@@ -375,6 +376,16 @@ impl BallistaContext {
                 ..
             }) => {
                 let table_exists = ctx.table_exist(name.as_str())?;
+                let schema: SchemaRef = 
Arc::new(schema.as_ref().to_owned().into());
+                let table_partition_cols = table_partition_cols
+                    .iter()
+                    .map(|col| {
+                        schema
+                            .field_with_name(col)
+                            .map(|f| (f.name().to_owned(), 
f.data_type().to_owned()))
+                            .map_err(DataFusionError::ArrowError)
+                    })
+                    .collect::<Result<Vec<_>>>()?;
 
                 match (if_not_exists, table_exists) {
                     (_, false) => match file_type.to_lowercase().as_str() {
@@ -383,9 +394,8 @@ impl BallistaContext {
                                 .has_header(*has_header)
                                 .delimiter(*delimiter as u8)
                                 
.table_partition_cols(table_partition_cols.to_vec());
-                            let csv_schema = schema.as_ref().to_owned().into();
                             if !schema.fields().is_empty() {
-                                options = options.schema(&csv_schema);
+                                options = options.schema(&schema);
                             }
                             self.register_csv(name, location, options).await?;
                             Ok(Arc::new(DataFrame::new(ctx.state.clone(), 
&plan)))
@@ -395,7 +405,7 @@ impl BallistaContext {
                                 name,
                                 location,
                                 ParquetReadOptions::default()
-                                    
.table_partition_cols(table_partition_cols.to_vec()),
+                                    
.table_partition_cols(table_partition_cols),
                             )
                             .await?;
                             Ok(Arc::new(DataFrame::new(ctx.state.clone(), 
&plan)))
@@ -405,7 +415,7 @@ impl BallistaContext {
                                 name,
                                 location,
                                 AvroReadOptions::default()
-                                    
.table_partition_cols(table_partition_cols.to_vec()),
+                                    
.table_partition_cols(table_partition_cols),
                             )
                             .await?;
                             Ok(Arc::new(DataFrame::new(ctx.state.clone(), 
&plan)))
@@ -582,6 +592,7 @@ mod tests {
                         table_partition_cols: x.table_partition_cols.clone(),
                         collect_stat: x.collect_stat,
                         target_partitions: x.target_partitions,
+                        file_sort_order: None,
                     };
 
                     let table_paths = listing_table
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 20570a45..1166c99e 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -45,13 +45,13 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.8", default-features = false }
 
-arrow-flight = { version = "26.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "28.0.0", features = ["flight-sql-experimental"] }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "14.0.0"
+datafusion = "15.0.0"
 datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, 
optional = true }
-datafusion-proto = "14.0.0"
+datafusion-proto = "15.0.0"
 futures = "0.3"
 hashbrown = "0.13"
 
@@ -67,7 +67,7 @@ prost = "0.11"
 prost-types = "0.11"
 rand = "0.8"
 serde = { version = "1", features = ["derive"] }
-sqlparser = "0.25"
+sqlparser = "0.27"
 sys-info = "0.9.0"
 tokio = "1.0"
 tokio-stream = { version = "0.1", features = ["net"] }
diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index 5113a6a7..76d3521f 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -267,6 +267,7 @@ message AvroScanExecNode {
 enum PartitionMode {
   COLLECT_LEFT = 0;
   PARTITIONED = 1;
+  AUTO = 2;
 }
 
 message HashJoinExecNode {
diff --git a/ballista/core/proto/datafusion.proto 
b/ballista/core/proto/datafusion.proto
index c8ea70de..4152802d 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -109,6 +109,7 @@ message ListingTableScanNode {
     ParquetFormat parquet = 11;
     AvroFormat avro = 12;
   }
+  repeated datafusion.LogicalExprNode file_sort_order = 13;
 }
 
 message ViewTableScanNode {
@@ -176,6 +177,7 @@ message CreateExternalTableNode {
   string delimiter = 8;
   string definition = 9;
   string file_compression_type = 10;
+  map<string, string> options = 11;
 }
 
 message CreateCatalogSchemaNode {
@@ -409,8 +411,10 @@ message AliasNode {
 }
 
 message BinaryExprNode {
-  LogicalExprNode l = 1;
-  LogicalExprNode r = 2;
+  // Represents the operands from the left inner most expression
+  // to the right outer most expression where each of them are chained
+  // with the operator 'op'.
+  repeated LogicalExprNode operands = 1;
   string op = 3;
 }
 
@@ -739,6 +743,20 @@ message ScalarListValue{
     repeated ScalarValue values = 2;
 }
 
+message ScalarTime32Value {
+  oneof value {
+    int32 time32_second_value = 1;
+    int32 time32_millisecond_value = 2;
+  };
+}
+
+message ScalarTime64Value {
+  oneof value {
+    int64 time64_microsecond_value = 1;
+    int64 time64_nanosecond_value = 2;
+  };
+}
+
 message ScalarTimestampValue {
   oneof value {
     int64  time_microsecond_value = 1;
@@ -797,6 +815,7 @@ message ScalarValue{
         double float64_value = 13;
         // Literal Date32 value always has a unit of day
         int32  date_32_value = 14;
+        ScalarTime32Value time32_value = 15;
         ScalarListValue list_value = 17;
         //WAS: ScalarType null_list_value = 18;
 
@@ -808,7 +827,7 @@ message ScalarValue{
         ScalarDictionaryValue dictionary_value = 27;
         bytes binary_value = 28;
         bytes large_binary_value = 29;
-        int64 time64_value = 30;
+        ScalarTime64Value time64_value = 30;
         IntervalMonthDayNanoValue interval_month_day_nano = 31;
         StructValue struct_value = 32;
         ScalarFixedSizeBinary fixed_size_binary_value = 34;
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index 7f9685e8..d11c6ea5 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1,111 +1,113 @@
-// 
/////////////////////////////////////////////////////////////////////////////////////////////////
-// Ballista Logical Plan
-// 
/////////////////////////////////////////////////////////////////////////////////////////////////
-
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct Statistics {
-    #[prost(int64, tag="1")]
+    #[prost(int64, tag = "1")]
     pub num_rows: i64,
-    #[prost(int64, tag="2")]
+    #[prost(int64, tag = "2")]
     pub total_byte_size: i64,
-    #[prost(message, repeated, tag="3")]
+    #[prost(message, repeated, tag = "3")]
     pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
-    #[prost(bool, tag="4")]
+    #[prost(bool, tag = "4")]
     pub is_exact: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FileRange {
-    #[prost(int64, tag="1")]
+    #[prost(int64, tag = "1")]
     pub start: i64,
-    #[prost(int64, tag="2")]
+    #[prost(int64, tag = "2")]
     pub end: i64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionedFile {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub path: ::prost::alloc::string::String,
-    #[prost(uint64, tag="2")]
+    #[prost(uint64, tag = "2")]
     pub size: u64,
-    #[prost(uint64, tag="3")]
+    #[prost(uint64, tag = "3")]
     pub last_modified_ns: u64,
-    #[prost(message, repeated, tag="4")]
-    pub partition_values: 
::prost::alloc::vec::Vec<::datafusion_proto::protobuf::ScalarValue>,
-    #[prost(message, optional, tag="5")]
+    #[prost(message, repeated, tag = "4")]
+    pub partition_values: ::prost::alloc::vec::Vec<
+        ::datafusion_proto::protobuf::ScalarValue,
+    >,
+    #[prost(message, optional, tag = "5")]
     pub range: ::core::option::Option<FileRange>,
 }
-// 
/////////////////////////////////////////////////////////////////////////////////////////////////
-// Ballista Physical Plan
-// 
/////////////////////////////////////////////////////////////////////////////////////////////////
-
 /// PhysicalPlanNode is a nested type
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalPlanNode {
-    #[prost(oneof="physical_plan_node::PhysicalPlanType", tags="1, 2, 3, 4, 6, 
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24")]
+    #[prost(
+        oneof = "physical_plan_node::PhysicalPlanType",
+        tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24"
+    )]
     pub physical_plan_type: 
::core::option::Option<physical_plan_node::PhysicalPlanType>,
 }
 /// Nested message and enum types in `PhysicalPlanNode`.
 pub mod physical_plan_node {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum PhysicalPlanType {
-        #[prost(message, tag="1")]
+        #[prost(message, tag = "1")]
         ParquetScan(super::ParquetScanExecNode),
-        #[prost(message, tag="2")]
+        #[prost(message, tag = "2")]
         CsvScan(super::CsvScanExecNode),
-        #[prost(message, tag="3")]
+        #[prost(message, tag = "3")]
         Empty(super::EmptyExecNode),
-        #[prost(message, tag="4")]
+        #[prost(message, tag = "4")]
         Projection(::prost::alloc::boxed::Box<super::ProjectionExecNode>),
-        #[prost(message, tag="6")]
+        #[prost(message, tag = "6")]
         GlobalLimit(::prost::alloc::boxed::Box<super::GlobalLimitExecNode>),
-        #[prost(message, tag="7")]
+        #[prost(message, tag = "7")]
         LocalLimit(::prost::alloc::boxed::Box<super::LocalLimitExecNode>),
-        #[prost(message, tag="8")]
+        #[prost(message, tag = "8")]
         Aggregate(::prost::alloc::boxed::Box<super::AggregateExecNode>),
-        #[prost(message, tag="9")]
+        #[prost(message, tag = "9")]
         HashJoin(::prost::alloc::boxed::Box<super::HashJoinExecNode>),
-        #[prost(message, tag="10")]
+        #[prost(message, tag = "10")]
         ShuffleReader(super::ShuffleReaderExecNode),
-        #[prost(message, tag="11")]
+        #[prost(message, tag = "11")]
         Sort(::prost::alloc::boxed::Box<super::SortExecNode>),
-        #[prost(message, tag="12")]
+        #[prost(message, tag = "12")]
         
CoalesceBatches(::prost::alloc::boxed::Box<super::CoalesceBatchesExecNode>),
-        #[prost(message, tag="13")]
+        #[prost(message, tag = "13")]
         Filter(::prost::alloc::boxed::Box<super::FilterExecNode>),
-        #[prost(message, tag="14")]
+        #[prost(message, tag = "14")]
         Merge(::prost::alloc::boxed::Box<super::CoalescePartitionsExecNode>),
-        #[prost(message, tag="15")]
+        #[prost(message, tag = "15")]
         Unresolved(super::UnresolvedShuffleExecNode),
-        #[prost(message, tag="16")]
+        #[prost(message, tag = "16")]
         Repartition(::prost::alloc::boxed::Box<super::RepartitionExecNode>),
-        #[prost(message, tag="17")]
+        #[prost(message, tag = "17")]
         Window(::prost::alloc::boxed::Box<super::WindowAggExecNode>),
-        #[prost(message, tag="18")]
+        #[prost(message, tag = "18")]
         
ShuffleWriter(::prost::alloc::boxed::Box<super::ShuffleWriterExecNode>),
-        #[prost(message, tag="19")]
+        #[prost(message, tag = "19")]
         CrossJoin(::prost::alloc::boxed::Box<super::CrossJoinExecNode>),
-        #[prost(message, tag="20")]
+        #[prost(message, tag = "20")]
         AvroScan(super::AvroScanExecNode),
-        #[prost(message, tag="21")]
+        #[prost(message, tag = "21")]
         Extension(super::PhysicalExtensionNode),
-        #[prost(message, tag="22")]
+        #[prost(message, tag = "22")]
         Union(super::UnionExecNode),
-        #[prost(message, tag="23")]
+        #[prost(message, tag = "23")]
         Explain(super::ExplainExecNode),
-        #[prost(message, tag="24")]
-        
SortPreservingMerge(::prost::alloc::boxed::Box<super::SortPreservingMergeExecNode>),
+        #[prost(message, tag = "24")]
+        SortPreservingMerge(
+            ::prost::alloc::boxed::Box<super::SortPreservingMergeExecNode>,
+        ),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalExtensionNode {
-    #[prost(bytes="vec", tag="1")]
+    #[prost(bytes = "vec", tag = "1")]
     pub node: ::prost::alloc::vec::Vec<u8>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub inputs: ::prost::alloc::vec::Vec<PhysicalPlanNode>,
 }
 /// physical expressions
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalExprNode {
-    #[prost(oneof="physical_expr_node::ExprType", tags="1, 2, 3, 4, 5, 6, 7, 
8, 9, 10, 11, 12, 13, 14, 15, 16, 17")]
+    #[prost(
+        oneof = "physical_expr_node::ExprType",
+        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17"
+    )]
     pub expr_type: ::core::option::Option<physical_expr_node::ExprType>,
 }
 /// Nested message and enum types in `PhysicalExprNode`.
@@ -113,467 +115,481 @@ pub mod physical_expr_node {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum ExprType {
         /// column references
-        #[prost(message, tag="1")]
+        #[prost(message, tag = "1")]
         Column(super::PhysicalColumn),
-        #[prost(message, tag="2")]
+        #[prost(message, tag = "2")]
         Literal(::datafusion_proto::protobuf::ScalarValue),
         /// binary expressions
-        #[prost(message, tag="3")]
+        #[prost(message, tag = "3")]
         BinaryExpr(::prost::alloc::boxed::Box<super::PhysicalBinaryExprNode>),
         /// aggregate expressions
-        #[prost(message, tag="4")]
+        #[prost(message, tag = "4")]
         AggregateExpr(super::PhysicalAggregateExprNode),
         /// null checks
-        #[prost(message, tag="5")]
+        #[prost(message, tag = "5")]
         IsNullExpr(::prost::alloc::boxed::Box<super::PhysicalIsNull>),
-        #[prost(message, tag="6")]
+        #[prost(message, tag = "6")]
         IsNotNullExpr(::prost::alloc::boxed::Box<super::PhysicalIsNotNull>),
-        #[prost(message, tag="7")]
+        #[prost(message, tag = "7")]
         NotExpr(::prost::alloc::boxed::Box<super::PhysicalNot>),
-        #[prost(message, tag="8")]
+        #[prost(message, tag = "8")]
         Case(::prost::alloc::boxed::Box<super::PhysicalCaseNode>),
-        #[prost(message, tag="9")]
+        #[prost(message, tag = "9")]
         Cast(::prost::alloc::boxed::Box<super::PhysicalCastNode>),
-        #[prost(message, tag="10")]
+        #[prost(message, tag = "10")]
         Sort(::prost::alloc::boxed::Box<super::PhysicalSortExprNode>),
-        #[prost(message, tag="11")]
+        #[prost(message, tag = "11")]
         Negative(::prost::alloc::boxed::Box<super::PhysicalNegativeNode>),
-        #[prost(message, tag="12")]
+        #[prost(message, tag = "12")]
         InList(::prost::alloc::boxed::Box<super::PhysicalInListNode>),
-        #[prost(message, tag="13")]
+        #[prost(message, tag = "13")]
         ScalarFunction(super::PhysicalScalarFunctionNode),
-        #[prost(message, tag="14")]
+        #[prost(message, tag = "14")]
         TryCast(::prost::alloc::boxed::Box<super::PhysicalTryCastNode>),
         /// window expressions
-        #[prost(message, tag="15")]
+        #[prost(message, tag = "15")]
         WindowExpr(::prost::alloc::boxed::Box<super::PhysicalWindowExprNode>),
-        #[prost(message, tag="16")]
+        #[prost(message, tag = "16")]
         ScalarUdf(super::PhysicalScalarUdfNode),
-        #[prost(message, tag="17")]
-        
DateTimeIntervalExpr(::prost::alloc::boxed::Box<super::PhysicalDateTimeIntervalExprNode>),
+        #[prost(message, tag = "17")]
+        DateTimeIntervalExpr(
+            
::prost::alloc::boxed::Box<super::PhysicalDateTimeIntervalExprNode>,
+        ),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalScalarUdfNode {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub name: ::prost::alloc::string::String,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub args: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(message, optional, tag="4")]
+    #[prost(message, optional, tag = "4")]
     pub return_type: 
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalAggregateExprNode {
-    #[prost(enumeration="::datafusion_proto::protobuf::AggregateFunction", 
tag="1")]
+    #[prost(enumeration = "::datafusion_proto::protobuf::AggregateFunction", 
tag = "1")]
     pub aggr_function: i32,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(bool, tag="3")]
+    #[prost(bool, tag = "3")]
     pub distinct: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalWindowExprNode {
-    #[prost(message, optional, boxed, tag="4")]
+    #[prost(message, optional, boxed, tag = "4")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(oneof="physical_window_expr_node::WindowFunction", tags="1, 2")]
-    pub window_function: 
::core::option::Option<physical_window_expr_node::WindowFunction>,
+    #[prost(oneof = "physical_window_expr_node::WindowFunction", tags = "1, 
2")]
+    pub window_function: ::core::option::Option<
+        physical_window_expr_node::WindowFunction,
+    >,
 }
 /// Nested message and enum types in `PhysicalWindowExprNode`.
 pub mod physical_window_expr_node {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum WindowFunction {
-        #[prost(enumeration="::datafusion_proto::protobuf::AggregateFunction", 
tag="1")]
+        #[prost(
+            enumeration = "::datafusion_proto::protobuf::AggregateFunction",
+            tag = "1"
+        )]
         AggrFunction(i32),
         /// udaf = 3
-        
#[prost(enumeration="::datafusion_proto::protobuf::BuiltInWindowFunction", 
tag="2")]
+        #[prost(
+            enumeration = 
"::datafusion_proto::protobuf::BuiltInWindowFunction",
+            tag = "2"
+        )]
         BuiltInFunction(i32),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalIsNull {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalIsNotNull {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalNot {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalAliasNode {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub expr: ::core::option::Option<PhysicalExprNode>,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub alias: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalBinaryExprNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub l: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(message, optional, boxed, tag="2")]
+    #[prost(message, optional, boxed, tag = "2")]
     pub r: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(string, tag="3")]
+    #[prost(string, tag = "3")]
     pub op: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalDateTimeIntervalExprNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub l: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(message, optional, boxed, tag="2")]
+    #[prost(message, optional, boxed, tag = "2")]
     pub r: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(string, tag="3")]
+    #[prost(string, tag = "3")]
     pub op: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalSortExprNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(bool, tag="2")]
+    #[prost(bool, tag = "2")]
     pub asc: bool,
-    #[prost(bool, tag="3")]
+    #[prost(bool, tag = "3")]
     pub nulls_first: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalWhenThen {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub when_expr: ::core::option::Option<PhysicalExprNode>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub then_expr: ::core::option::Option<PhysicalExprNode>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalInListNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub list: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(bool, tag="3")]
+    #[prost(bool, tag = "3")]
     pub negated: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalCaseNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub when_then_expr: ::prost::alloc::vec::Vec<PhysicalWhenThen>,
-    #[prost(message, optional, boxed, tag="3")]
+    #[prost(message, optional, boxed, tag = "3")]
     pub else_expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalScalarFunctionNode {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub name: ::prost::alloc::string::String,
-    #[prost(enumeration="::datafusion_proto::protobuf::ScalarFunction", 
tag="2")]
+    #[prost(enumeration = "::datafusion_proto::protobuf::ScalarFunction", tag 
= "2")]
     pub fun: i32,
-    #[prost(message, repeated, tag="3")]
+    #[prost(message, repeated, tag = "3")]
     pub args: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(message, optional, tag="4")]
+    #[prost(message, optional, tag = "4")]
     pub return_type: 
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalTryCastNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub arrow_type: 
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalCastNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub arrow_type: 
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalNegativeNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnresolvedShuffleExecNode {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub stage_id: u32,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub input_partition_count: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub output_partition_count: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FilterExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub expr: ::core::option::Option<PhysicalExprNode>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FileGroup {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub files: ::prost::alloc::vec::Vec<PartitionedFile>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ScanLimit {
     /// wrap into a message to make it optional
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub limit: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FileScanExecConf {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub file_groups: ::prost::alloc::vec::Vec<FileGroup>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
-    #[prost(uint32, repeated, tag="4")]
+    #[prost(uint32, repeated, tag = "4")]
     pub projection: ::prost::alloc::vec::Vec<u32>,
-    #[prost(message, optional, tag="5")]
+    #[prost(message, optional, tag = "5")]
     pub limit: ::core::option::Option<ScanLimit>,
-    #[prost(message, optional, tag="6")]
+    #[prost(message, optional, tag = "6")]
     pub statistics: ::core::option::Option<Statistics>,
-    #[prost(string, repeated, tag="7")]
+    #[prost(string, repeated, tag = "7")]
     pub table_partition_cols: 
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
-    #[prost(string, tag="8")]
+    #[prost(string, tag = "8")]
     pub object_store_url: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ParquetScanExecNode {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub base_conf: ::core::option::Option<FileScanExecConf>,
-    #[prost(message, optional, tag="2")]
-    pub pruning_predicate: 
::core::option::Option<::datafusion_proto::protobuf::LogicalExprNode>,
+    #[prost(message, optional, tag = "2")]
+    pub pruning_predicate: ::core::option::Option<
+        ::datafusion_proto::protobuf::LogicalExprNode,
+    >,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CsvScanExecNode {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub base_conf: ::core::option::Option<FileScanExecConf>,
-    #[prost(bool, tag="2")]
+    #[prost(bool, tag = "2")]
     pub has_header: bool,
-    #[prost(string, tag="3")]
+    #[prost(string, tag = "3")]
     pub delimiter: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct AvroScanExecNode {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub base_conf: ::core::option::Option<FileScanExecConf>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct HashJoinExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub left: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, optional, boxed, tag="2")]
+    #[prost(message, optional, boxed, tag = "2")]
     pub right: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, repeated, tag="3")]
+    #[prost(message, repeated, tag = "3")]
     pub on: ::prost::alloc::vec::Vec<JoinOn>,
-    #[prost(enumeration="::datafusion_proto::protobuf::JoinType", tag="4")]
+    #[prost(enumeration = "::datafusion_proto::protobuf::JoinType", tag = "4")]
     pub join_type: i32,
-    #[prost(enumeration="PartitionMode", tag="6")]
+    #[prost(enumeration = "PartitionMode", tag = "6")]
     pub partition_mode: i32,
-    #[prost(bool, tag="7")]
+    #[prost(bool, tag = "7")]
     pub null_equals_null: bool,
-    #[prost(message, optional, tag="8")]
+    #[prost(message, optional, tag = "8")]
     pub filter: ::core::option::Option<JoinFilter>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnionExecNode {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub inputs: ::prost::alloc::vec::Vec<PhysicalPlanNode>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExplainExecNode {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
-    #[prost(message, repeated, tag="2")]
-    pub stringified_plans: 
::prost::alloc::vec::Vec<::datafusion_proto::protobuf::StringifiedPlan>,
-    #[prost(bool, tag="3")]
+    #[prost(message, repeated, tag = "2")]
+    pub stringified_plans: ::prost::alloc::vec::Vec<
+        ::datafusion_proto::protobuf::StringifiedPlan,
+    >,
+    #[prost(bool, tag = "3")]
     pub verbose: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CrossJoinExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub left: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, optional, boxed, tag="2")]
+    #[prost(message, optional, boxed, tag = "2")]
     pub right: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalColumn {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub name: ::prost::alloc::string::String,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub index: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JoinOn {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub left: ::core::option::Option<PhysicalColumn>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub right: ::core::option::Option<PhysicalColumn>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct EmptyExecNode {
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub produce_one_row: bool,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ProjectionExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(string, repeated, tag="3")]
+    #[prost(string, repeated, tag = "3")]
     pub expr_name: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct WindowAggExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub window_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(string, repeated, tag="3")]
+    #[prost(string, repeated, tag = "3")]
     pub window_expr_name: 
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
-    #[prost(message, optional, tag="4")]
+    #[prost(message, optional, tag = "4")]
     pub input_schema: 
::core::option::Option<::datafusion_proto::protobuf::Schema>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct AggregateExecNode {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub group_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub aggr_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(enumeration="AggregateMode", tag="3")]
+    #[prost(enumeration = "AggregateMode", tag = "3")]
     pub mode: i32,
-    #[prost(message, optional, boxed, tag="4")]
+    #[prost(message, optional, boxed, tag = "4")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(string, repeated, tag="5")]
+    #[prost(string, repeated, tag = "5")]
     pub group_expr_name: 
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
-    #[prost(string, repeated, tag="6")]
+    #[prost(string, repeated, tag = "6")]
     pub aggr_expr_name: 
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
     /// we need the input schema to the partial aggregate to pass to the final 
aggregate
-    #[prost(message, optional, tag="7")]
+    #[prost(message, optional, tag = "7")]
     pub input_schema: 
::core::option::Option<::datafusion_proto::protobuf::Schema>,
-    #[prost(message, repeated, tag="8")]
+    #[prost(message, repeated, tag = "8")]
     pub null_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(bool, repeated, tag="9")]
+    #[prost(bool, repeated, tag = "9")]
     pub groups: ::prost::alloc::vec::Vec<bool>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleWriterExecNode {
     /// TODO it seems redundant to provide job and stage id here since we also 
have them
     /// in the TaskDefinition that wraps this plan
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub stage_id: u32,
-    #[prost(message, optional, boxed, tag="3")]
+    #[prost(message, optional, boxed, tag = "3")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, optional, tag="4")]
+    #[prost(message, optional, tag = "4")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleReaderExecNode {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub partition: ::prost::alloc::vec::Vec<ShuffleReaderPartition>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleReaderPartition {
     /// each partition of a shuffle read can read data from multiple locations
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub location: ::prost::alloc::vec::Vec<PartitionLocation>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GlobalLimitExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
     /// The number of rows to skip before fetch
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub skip: u32,
     /// Maximum number of rows to fetch; negative means no limit
-    #[prost(int64, tag="3")]
+    #[prost(int64, tag = "3")]
     pub fetch: i64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LocalLimitExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub fetch: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SortExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
     /// Maximum number of highest/lowest rows to fetch; negative means no limit
-    #[prost(int64, tag="3")]
+    #[prost(int64, tag = "3")]
     pub fetch: i64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SortPreservingMergeExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CoalesceBatchesExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub target_batch_size: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CoalescePartitionsExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalHashRepartition {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub hash_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
-    #[prost(uint64, tag="2")]
+    #[prost(uint64, tag = "2")]
     pub partition_count: u64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RepartitionExecNode {
-    #[prost(message, optional, boxed, tag="1")]
+    #[prost(message, optional, boxed, tag = "1")]
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
-    #[prost(oneof="repartition_exec_node::PartitionMethod", tags="2, 3, 4")]
+    #[prost(oneof = "repartition_exec_node::PartitionMethod", tags = "2, 3, 
4")]
     pub partition_method: 
::core::option::Option<repartition_exec_node::PartitionMethod>,
 }
 /// Nested message and enum types in `RepartitionExecNode`.
 pub mod repartition_exec_node {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum PartitionMethod {
-        #[prost(uint64, tag="2")]
+        #[prost(uint64, tag = "2")]
         RoundRobin(u64),
-        #[prost(message, tag="3")]
+        #[prost(message, tag = "3")]
         Hash(super::PhysicalHashRepartition),
-        #[prost(uint64, tag="4")]
+        #[prost(uint64, tag = "4")]
         Unknown(u64),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JoinFilter {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub expression: ::core::option::Option<PhysicalExprNode>,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub column_indices: ::prost::alloc::vec::Vec<ColumnIndex>,
-    #[prost(message, optional, tag="3")]
+    #[prost(message, optional, tag = "3")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ColumnIndex {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub index: u32,
-    #[prost(enumeration="JoinSide", tag="2")]
+    #[prost(enumeration = "JoinSide", tag = "2")]
     pub side: i32,
 }
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -581,202 +597,206 @@ pub struct ColumnIndex {
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutionGraph {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub session_id: ::prost::alloc::string::String,
-    #[prost(message, optional, tag="3")]
+    #[prost(message, optional, tag = "3")]
     pub status: ::core::option::Option<JobStatus>,
-    #[prost(message, repeated, tag="4")]
+    #[prost(message, repeated, tag = "4")]
     pub stages: ::prost::alloc::vec::Vec<ExecutionGraphStage>,
-    #[prost(uint64, tag="5")]
+    #[prost(uint64, tag = "5")]
     pub output_partitions: u64,
-    #[prost(message, repeated, tag="6")]
+    #[prost(message, repeated, tag = "6")]
     pub output_locations: ::prost::alloc::vec::Vec<PartitionLocation>,
-    #[prost(string, tag="7")]
+    #[prost(string, tag = "7")]
     pub scheduler_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="8")]
+    #[prost(uint32, tag = "8")]
     pub task_id_gen: u32,
-    #[prost(message, repeated, tag="9")]
+    #[prost(message, repeated, tag = "9")]
     pub failed_attempts: ::prost::alloc::vec::Vec<StageAttempts>,
-    #[prost(string, tag="10")]
+    #[prost(string, tag = "10")]
     pub job_name: ::prost::alloc::string::String,
-    #[prost(uint64, tag="11")]
+    #[prost(uint64, tag = "11")]
     pub start_time: u64,
-    #[prost(uint64, tag="12")]
+    #[prost(uint64, tag = "12")]
     pub end_time: u64,
-    #[prost(uint64, tag="13")]
+    #[prost(uint64, tag = "13")]
     pub queued_at: u64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct StageAttempts {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub stage_id: u32,
-    #[prost(uint32, repeated, tag="2")]
+    #[prost(uint32, repeated, tag = "2")]
     pub stage_attempt_num: ::prost::alloc::vec::Vec<u32>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutionGraphStage {
-    #[prost(oneof="execution_graph_stage::StageType", tags="1, 2, 3, 4")]
+    #[prost(oneof = "execution_graph_stage::StageType", tags = "1, 2, 3, 4")]
     pub stage_type: ::core::option::Option<execution_graph_stage::StageType>,
 }
 /// Nested message and enum types in `ExecutionGraphStage`.
 pub mod execution_graph_stage {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum StageType {
-        #[prost(message, tag="1")]
+        #[prost(message, tag = "1")]
         UnresolvedStage(super::UnResolvedStage),
-        #[prost(message, tag="2")]
+        #[prost(message, tag = "2")]
         ResolvedStage(super::ResolvedStage),
-        #[prost(message, tag="3")]
+        #[prost(message, tag = "3")]
         SuccessfulStage(super::SuccessfulStage),
-        #[prost(message, tag="4")]
+        #[prost(message, tag = "4")]
         FailedStage(super::FailedStage),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnResolvedStage {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub stage_id: u32,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
-    #[prost(uint32, repeated, tag="3")]
+    #[prost(uint32, repeated, tag = "3")]
     pub output_links: ::prost::alloc::vec::Vec<u32>,
-    #[prost(message, repeated, tag="4")]
+    #[prost(message, repeated, tag = "4")]
     pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
-    #[prost(bytes="vec", tag="5")]
+    #[prost(bytes = "vec", tag = "5")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
-    #[prost(uint32, tag="6")]
+    #[prost(uint32, tag = "6")]
     pub stage_attempt_num: u32,
-    #[prost(string, repeated, tag="7")]
-    pub last_attempt_failure_reasons: 
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+    #[prost(string, repeated, tag = "7")]
+    pub last_attempt_failure_reasons: ::prost::alloc::vec::Vec<
+        ::prost::alloc::string::String,
+    >,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ResolvedStage {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub stage_id: u32,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub partitions: u32,
-    #[prost(message, optional, tag="3")]
+    #[prost(message, optional, tag = "3")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
-    #[prost(uint32, repeated, tag="4")]
+    #[prost(uint32, repeated, tag = "4")]
     pub output_links: ::prost::alloc::vec::Vec<u32>,
-    #[prost(message, repeated, tag="5")]
+    #[prost(message, repeated, tag = "5")]
     pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
-    #[prost(bytes="vec", tag="6")]
+    #[prost(bytes = "vec", tag = "6")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
-    #[prost(uint32, tag="7")]
+    #[prost(uint32, tag = "7")]
     pub stage_attempt_num: u32,
-    #[prost(string, repeated, tag="8")]
-    pub last_attempt_failure_reasons: 
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+    #[prost(string, repeated, tag = "8")]
+    pub last_attempt_failure_reasons: ::prost::alloc::vec::Vec<
+        ::prost::alloc::string::String,
+    >,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulStage {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub stage_id: u32,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub partitions: u32,
-    #[prost(message, optional, tag="3")]
+    #[prost(message, optional, tag = "3")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
-    #[prost(uint32, repeated, tag="4")]
+    #[prost(uint32, repeated, tag = "4")]
     pub output_links: ::prost::alloc::vec::Vec<u32>,
-    #[prost(message, repeated, tag="5")]
+    #[prost(message, repeated, tag = "5")]
     pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
-    #[prost(bytes="vec", tag="6")]
+    #[prost(bytes = "vec", tag = "6")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
-    #[prost(message, repeated, tag="7")]
+    #[prost(message, repeated, tag = "7")]
     pub task_infos: ::prost::alloc::vec::Vec<TaskInfo>,
-    #[prost(message, repeated, tag="8")]
+    #[prost(message, repeated, tag = "8")]
     pub stage_metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
-    #[prost(uint32, tag="9")]
+    #[prost(uint32, tag = "9")]
     pub stage_attempt_num: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedStage {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub stage_id: u32,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub partitions: u32,
-    #[prost(message, optional, tag="3")]
+    #[prost(message, optional, tag = "3")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
-    #[prost(uint32, repeated, tag="4")]
+    #[prost(uint32, repeated, tag = "4")]
     pub output_links: ::prost::alloc::vec::Vec<u32>,
-    #[prost(bytes="vec", tag="5")]
+    #[prost(bytes = "vec", tag = "5")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
-    #[prost(message, repeated, tag="6")]
+    #[prost(message, repeated, tag = "6")]
     pub task_infos: ::prost::alloc::vec::Vec<TaskInfo>,
-    #[prost(message, repeated, tag="7")]
+    #[prost(message, repeated, tag = "7")]
     pub stage_metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
-    #[prost(string, tag="8")]
+    #[prost(string, tag = "8")]
     pub error_message: ::prost::alloc::string::String,
-    #[prost(uint32, tag="9")]
+    #[prost(uint32, tag = "9")]
     pub stage_attempt_num: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskInfo {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub task_id: u32,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub partition_id: u32,
     /// Scheduler schedule time
-    #[prost(uint64, tag="3")]
+    #[prost(uint64, tag = "3")]
     pub scheduled_time: u64,
     /// Scheduler launch time
-    #[prost(uint64, tag="4")]
+    #[prost(uint64, tag = "4")]
     pub launch_time: u64,
     /// The time the Executor start to run the task
-    #[prost(uint64, tag="5")]
+    #[prost(uint64, tag = "5")]
     pub start_exec_time: u64,
     /// The time the Executor finish the task
-    #[prost(uint64, tag="6")]
+    #[prost(uint64, tag = "6")]
     pub end_exec_time: u64,
     /// Scheduler side finish time
-    #[prost(uint64, tag="7")]
+    #[prost(uint64, tag = "7")]
     pub finish_time: u64,
-    #[prost(oneof="task_info::Status", tags="8, 9, 10")]
+    #[prost(oneof = "task_info::Status", tags = "8, 9, 10")]
     pub status: ::core::option::Option<task_info::Status>,
 }
 /// Nested message and enum types in `TaskInfo`.
 pub mod task_info {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
-        #[prost(message, tag="8")]
+        #[prost(message, tag = "8")]
         Running(super::RunningTask),
-        #[prost(message, tag="9")]
+        #[prost(message, tag = "9")]
         Failed(super::FailedTask),
-        #[prost(message, tag="10")]
+        #[prost(message, tag = "10")]
         Successful(super::SuccessfulTask),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GraphStageInput {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub stage_id: u32,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub partition_locations: ::prost::alloc::vec::Vec<TaskInputPartitions>,
-    #[prost(bool, tag="3")]
+    #[prost(bool, tag = "3")]
     pub complete: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskInputPartitions {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub partition: u32,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct KeyValuePair {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub key: ::prost::alloc::string::String,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub value: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct Action {
     /// configuration settings
-    #[prost(message, repeated, tag="100")]
+    #[prost(message, repeated, tag = "100")]
     pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
-    #[prost(oneof="action::ActionType", tags="3")]
+    #[prost(oneof = "action::ActionType", tags = "3")]
     pub action_type: ::core::option::Option<action::ActionType>,
 }
 /// Nested message and enum types in `Action`.
@@ -784,183 +804,183 @@ pub mod action {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum ActionType {
         /// Fetch a partition from an executor
-        #[prost(message, tag="3")]
+        #[prost(message, tag = "3")]
         FetchPartition(super::FetchPartition),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutePartition {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub stage_id: u32,
-    #[prost(uint32, repeated, tag="3")]
+    #[prost(uint32, repeated, tag = "3")]
     pub partition_id: ::prost::alloc::vec::Vec<u32>,
-    #[prost(message, optional, tag="4")]
+    #[prost(message, optional, tag = "4")]
     pub plan: ::core::option::Option<PhysicalPlanNode>,
     /// The task could need to read partitions from other executors
-    #[prost(message, repeated, tag="5")]
+    #[prost(message, repeated, tag = "5")]
     pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
     /// Output partition for shuffle writer
-    #[prost(message, optional, tag="6")]
+    #[prost(message, optional, tag = "6")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FetchPartition {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub stage_id: u32,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub partition_id: u32,
-    #[prost(string, tag="4")]
+    #[prost(string, tag = "4")]
     pub path: ::prost::alloc::string::String,
-    #[prost(string, tag="5")]
+    #[prost(string, tag = "5")]
     pub host: ::prost::alloc::string::String,
-    #[prost(uint32, tag="6")]
+    #[prost(uint32, tag = "6")]
     pub port: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionLocation {
     /// partition_id of the map stage who produces the shuffle.
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub map_partition_id: u32,
     /// partition_id of the shuffle, a composition of(job_id + map_stage_id + 
partition_id).
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub partition_id: ::core::option::Option<PartitionId>,
-    #[prost(message, optional, tag="3")]
+    #[prost(message, optional, tag = "3")]
     pub executor_meta: ::core::option::Option<ExecutorMetadata>,
-    #[prost(message, optional, tag="4")]
+    #[prost(message, optional, tag = "4")]
     pub partition_stats: ::core::option::Option<PartitionStats>,
-    #[prost(string, tag="5")]
+    #[prost(string, tag = "5")]
     pub path: ::prost::alloc::string::String,
 }
 /// Unique identifier for a materialized partition of data
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionId {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub stage_id: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub partition_id: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskId {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub task_id: u32,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub task_attempt_num: u32,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub partition_id: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionStats {
-    #[prost(int64, tag="1")]
+    #[prost(int64, tag = "1")]
     pub num_rows: i64,
-    #[prost(int64, tag="2")]
+    #[prost(int64, tag = "2")]
     pub num_batches: i64,
-    #[prost(int64, tag="3")]
+    #[prost(int64, tag = "3")]
     pub num_bytes: i64,
-    #[prost(message, repeated, tag="4")]
+    #[prost(message, repeated, tag = "4")]
     pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ColumnStats {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub min_value: 
::core::option::Option<::datafusion_proto::protobuf::ScalarValue>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub max_value: 
::core::option::Option<::datafusion_proto::protobuf::ScalarValue>,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub null_count: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub distinct_count: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct OperatorMetricsSet {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub metrics: ::prost::alloc::vec::Vec<OperatorMetric>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedCount {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub name: ::prost::alloc::string::String,
-    #[prost(uint64, tag="2")]
+    #[prost(uint64, tag = "2")]
     pub value: u64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedGauge {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub name: ::prost::alloc::string::String,
-    #[prost(uint64, tag="2")]
+    #[prost(uint64, tag = "2")]
     pub value: u64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedTime {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub name: ::prost::alloc::string::String,
-    #[prost(uint64, tag="2")]
+    #[prost(uint64, tag = "2")]
     pub value: u64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct OperatorMetric {
-    #[prost(oneof="operator_metric::Metric", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 
10")]
+    #[prost(oneof = "operator_metric::Metric", tags = "1, 2, 3, 4, 5, 6, 7, 8, 
9, 10")]
     pub metric: ::core::option::Option<operator_metric::Metric>,
 }
 /// Nested message and enum types in `OperatorMetric`.
 pub mod operator_metric {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Metric {
-        #[prost(uint64, tag="1")]
+        #[prost(uint64, tag = "1")]
         OutputRows(u64),
-        #[prost(uint64, tag="2")]
+        #[prost(uint64, tag = "2")]
         ElapseTime(u64),
-        #[prost(uint64, tag="3")]
+        #[prost(uint64, tag = "3")]
         SpillCount(u64),
-        #[prost(uint64, tag="4")]
+        #[prost(uint64, tag = "4")]
         SpilledBytes(u64),
-        #[prost(uint64, tag="5")]
+        #[prost(uint64, tag = "5")]
         CurrentMemoryUsage(u64),
-        #[prost(message, tag="6")]
+        #[prost(message, tag = "6")]
         Count(super::NamedCount),
-        #[prost(message, tag="7")]
+        #[prost(message, tag = "7")]
         Gauge(super::NamedGauge),
-        #[prost(message, tag="8")]
+        #[prost(message, tag = "8")]
         Time(super::NamedTime),
-        #[prost(int64, tag="9")]
+        #[prost(int64, tag = "9")]
         StartTimestamp(i64),
-        #[prost(int64, tag="10")]
+        #[prost(int64, tag = "10")]
         EndTimestamp(i64),
     }
 }
 /// Used by scheduler
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorMetadata {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub id: ::prost::alloc::string::String,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub host: ::prost::alloc::string::String,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub port: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub grpc_port: u32,
-    #[prost(message, optional, tag="5")]
+    #[prost(message, optional, tag = "5")]
     pub specification: ::core::option::Option<ExecutorSpecification>,
 }
 /// Used by grpc
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorRegistration {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub port: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub grpc_port: u32,
-    #[prost(message, optional, tag="5")]
+    #[prost(message, optional, tag = "5")]
     pub specification: ::core::option::Option<ExecutorSpecification>,
     /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 
(see <https://github.com/tokio-rs/prost/issues/430> and 
<https://github.com/tokio-rs/prost/pull/455>)
     /// this syntax is ugly but is binary compatible with the "optional" 
keyword (see 
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
-    #[prost(oneof="executor_registration::OptionalHost", tags="2")]
+    #[prost(oneof = "executor_registration::OptionalHost", tags = "2")]
     pub optional_host: 
::core::option::Option<executor_registration::OptionalHost>,
 }
 /// Nested message and enum types in `ExecutorRegistration`.
@@ -969,26 +989,26 @@ pub mod executor_registration {
     /// this syntax is ugly but is binary compatible with the "optional" 
keyword (see 
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum OptionalHost {
-        #[prost(string, tag="2")]
+        #[prost(string, tag = "2")]
         Host(::prost::alloc::string::String),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorHeartbeat {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
     /// Unix epoch-based timestamp in seconds
-    #[prost(uint64, tag="2")]
+    #[prost(uint64, tag = "2")]
     pub timestamp: u64,
-    #[prost(message, repeated, tag="3")]
+    #[prost(message, repeated, tag = "3")]
     pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
-    #[prost(message, optional, tag="4")]
+    #[prost(message, optional, tag = "4")]
     pub status: ::core::option::Option<ExecutorStatus>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorMetric {
     /// TODO add more metrics
-    #[prost(oneof="executor_metric::Metric", tags="1")]
+    #[prost(oneof = "executor_metric::Metric", tags = "1")]
     pub metric: ::core::option::Option<executor_metric::Metric>,
 }
 /// Nested message and enum types in `ExecutorMetric`.
@@ -996,36 +1016,36 @@ pub mod executor_metric {
     /// TODO add more metrics
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Metric {
-        #[prost(uint64, tag="1")]
+        #[prost(uint64, tag = "1")]
         AvailableMemory(u64),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStatus {
-    #[prost(oneof="executor_status::Status", tags="1, 2, 3")]
+    #[prost(oneof = "executor_status::Status", tags = "1, 2, 3")]
     pub status: ::core::option::Option<executor_status::Status>,
 }
 /// Nested message and enum types in `ExecutorStatus`.
 pub mod executor_status {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
-        #[prost(string, tag="1")]
+        #[prost(string, tag = "1")]
         Active(::prost::alloc::string::String),
-        #[prost(string, tag="2")]
+        #[prost(string, tag = "2")]
         Dead(::prost::alloc::string::String),
-        #[prost(string, tag="3")]
+        #[prost(string, tag = "3")]
         Unknown(::prost::alloc::string::String),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorSpecification {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub resources: ::prost::alloc::vec::Vec<ExecutorResource>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorResource {
     /// TODO add more resources
-    #[prost(oneof="executor_resource::Resource", tags="1")]
+    #[prost(oneof = "executor_resource::Resource", tags = "1")]
     pub resource: ::core::option::Option<executor_resource::Resource>,
 }
 /// Nested message and enum types in `ExecutorResource`.
@@ -1033,452 +1053,443 @@ pub mod executor_resource {
     /// TODO add more resources
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Resource {
-        #[prost(uint32, tag="1")]
+        #[prost(uint32, tag = "1")]
         TaskSlots(u32),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorData {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub resources: ::prost::alloc::vec::Vec<ExecutorResourcePair>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorResourcePair {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub total: ::core::option::Option<ExecutorResource>,
-    #[prost(message, optional, tag="2")]
+    #[prost(message, optional, tag = "2")]
     pub available: ::core::option::Option<ExecutorResource>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningTask {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedTask {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub error: ::prost::alloc::string::String,
-    #[prost(bool, tag="2")]
+    #[prost(bool, tag = "2")]
     pub retryable: bool,
     /// Whether this task failure should be counted to the maximum number of 
times the task is allowed to retry
-    #[prost(bool, tag="3")]
+    #[prost(bool, tag = "3")]
     pub count_to_failures: bool,
-    #[prost(oneof="failed_task::FailedReason", tags="4, 5, 6, 7, 8, 9")]
+    #[prost(oneof = "failed_task::FailedReason", tags = "4, 5, 6, 7, 8, 9")]
     pub failed_reason: ::core::option::Option<failed_task::FailedReason>,
 }
 /// Nested message and enum types in `FailedTask`.
 pub mod failed_task {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum FailedReason {
-        #[prost(message, tag="4")]
+        #[prost(message, tag = "4")]
         ExecutionError(super::ExecutionError),
-        #[prost(message, tag="5")]
+        #[prost(message, tag = "5")]
         FetchPartitionError(super::FetchPartitionError),
-        #[prost(message, tag="6")]
+        #[prost(message, tag = "6")]
         IoError(super::IoError),
-        #[prost(message, tag="7")]
+        #[prost(message, tag = "7")]
         ExecutorLost(super::ExecutorLost),
         /// A successful task's result is lost due to executor lost
-        #[prost(message, tag="8")]
+        #[prost(message, tag = "8")]
         ResultLost(super::ResultLost),
-        #[prost(message, tag="9")]
+        #[prost(message, tag = "9")]
         TaskKilled(super::TaskKilled),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulTask {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
     /// TODO tasks are currently always shuffle writes but this will not 
always be the case
     /// so we might want to think about some refactoring of the task 
definitions
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub partitions: ::prost::alloc::vec::Vec<ShuffleWritePartition>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ExecutionError {
-}
+pub struct ExecutionError {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FetchPartitionError {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub map_stage_id: u32,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub map_partition_id: u32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct IoError {
-}
+pub struct IoError {}
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ExecutorLost {
-}
+pub struct ExecutorLost {}
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ResultLost {
-}
+pub struct ResultLost {}
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct TaskKilled {
-}
+pub struct TaskKilled {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleWritePartition {
-    #[prost(uint64, tag="1")]
+    #[prost(uint64, tag = "1")]
     pub partition_id: u64,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub path: ::prost::alloc::string::String,
-    #[prost(uint64, tag="3")]
+    #[prost(uint64, tag = "3")]
     pub num_batches: u64,
-    #[prost(uint64, tag="4")]
+    #[prost(uint64, tag = "4")]
     pub num_rows: u64,
-    #[prost(uint64, tag="5")]
+    #[prost(uint64, tag = "5")]
     pub num_bytes: u64,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskStatus {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub task_id: u32,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub stage_id: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub stage_attempt_num: u32,
-    #[prost(uint32, tag="5")]
+    #[prost(uint32, tag = "5")]
     pub partition_id: u32,
-    #[prost(uint64, tag="6")]
+    #[prost(uint64, tag = "6")]
     pub launch_time: u64,
-    #[prost(uint64, tag="7")]
+    #[prost(uint64, tag = "7")]
     pub start_exec_time: u64,
-    #[prost(uint64, tag="8")]
+    #[prost(uint64, tag = "8")]
     pub end_exec_time: u64,
-    #[prost(message, repeated, tag="12")]
+    #[prost(message, repeated, tag = "12")]
     pub metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
-    #[prost(oneof="task_status::Status", tags="9, 10, 11")]
+    #[prost(oneof = "task_status::Status", tags = "9, 10, 11")]
     pub status: ::core::option::Option<task_status::Status>,
 }
 /// Nested message and enum types in `TaskStatus`.
 pub mod task_status {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
-        #[prost(message, tag="9")]
+        #[prost(message, tag = "9")]
         Running(super::RunningTask),
-        #[prost(message, tag="10")]
+        #[prost(message, tag = "10")]
         Failed(super::FailedTask),
-        #[prost(message, tag="11")]
+        #[prost(message, tag = "11")]
         Successful(super::SuccessfulTask),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PollWorkParams {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub metadata: ::core::option::Option<ExecutorRegistration>,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub num_free_slots: u32,
     /// All tasks must be reported until they reach the failed or completed 
state
-    #[prost(message, repeated, tag="3")]
+    #[prost(message, repeated, tag = "3")]
     pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskDefinition {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub task_id: u32,
-    #[prost(uint32, tag="2")]
+    #[prost(uint32, tag = "2")]
     pub task_attempt_num: u32,
-    #[prost(string, tag="3")]
+    #[prost(string, tag = "3")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub stage_id: u32,
-    #[prost(uint32, tag="5")]
+    #[prost(uint32, tag = "5")]
     pub stage_attempt_num: u32,
-    #[prost(uint32, tag="6")]
+    #[prost(uint32, tag = "6")]
     pub partition_id: u32,
-    #[prost(bytes="vec", tag="7")]
+    #[prost(bytes = "vec", tag = "7")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
     /// Output partition for shuffle writer
-    #[prost(message, optional, tag="8")]
+    #[prost(message, optional, tag = "8")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
-    #[prost(string, tag="9")]
+    #[prost(string, tag = "9")]
     pub session_id: ::prost::alloc::string::String,
-    #[prost(uint64, tag="10")]
+    #[prost(uint64, tag = "10")]
     pub launch_time: u64,
-    #[prost(message, repeated, tag="11")]
+    #[prost(message, repeated, tag = "11")]
     pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
 /// A set of tasks in the same stage
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct MultiTaskDefinition {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub task_ids: ::prost::alloc::vec::Vec<TaskId>,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub stage_id: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub stage_attempt_num: u32,
-    #[prost(bytes="vec", tag="5")]
+    #[prost(bytes = "vec", tag = "5")]
     pub plan: ::prost::alloc::vec::Vec<u8>,
     /// Output partition for shuffle writer
-    #[prost(message, optional, tag="6")]
+    #[prost(message, optional, tag = "6")]
     pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
-    #[prost(string, tag="7")]
+    #[prost(string, tag = "7")]
     pub session_id: ::prost::alloc::string::String,
-    #[prost(uint64, tag="8")]
+    #[prost(uint64, tag = "8")]
     pub launch_time: u64,
-    #[prost(message, repeated, tag="9")]
+    #[prost(message, repeated, tag = "9")]
     pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SessionSettings {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JobSessionConfig {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub session_id: ::prost::alloc::string::String,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PollWorkResult {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RegisterExecutorParams {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub metadata: ::core::option::Option<ExecutorRegistration>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RegisterExecutorResult {
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub success: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct HeartBeatParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
-    #[prost(message, optional, tag="3")]
+    #[prost(message, optional, tag = "3")]
     pub status: ::core::option::Option<ExecutorStatus>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct HeartBeatResult {
     /// TODO it's from Spark for BlockManager
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub reregister: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct StopExecutorParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
     /// stop reason
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub reason: ::prost::alloc::string::String,
     /// force to stop the executor immediately
-    #[prost(bool, tag="3")]
+    #[prost(bool, tag = "3")]
     pub force: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct StopExecutorResult {
-}
+pub struct StopExecutorResult {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStoppedParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
     /// stop reason
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub reason: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ExecutorStoppedResult {
-}
+pub struct ExecutorStoppedResult {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UpdateTaskStatusParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
     /// All tasks must be reported until they reach the failed or completed 
state
-    #[prost(message, repeated, tag="2")]
+    #[prost(message, repeated, tag = "2")]
     pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UpdateTaskStatusResult {
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub success: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryParams {
-    #[prost(message, repeated, tag="4")]
+    #[prost(message, repeated, tag = "4")]
     pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
-    #[prost(oneof="execute_query_params::Query", tags="1, 2")]
+    #[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
     pub query: ::core::option::Option<execute_query_params::Query>,
-    #[prost(oneof="execute_query_params::OptionalSessionId", tags="3")]
-    pub optional_session_id: 
::core::option::Option<execute_query_params::OptionalSessionId>,
+    #[prost(oneof = "execute_query_params::OptionalSessionId", tags = "3")]
+    pub optional_session_id: ::core::option::Option<
+        execute_query_params::OptionalSessionId,
+    >,
 }
 /// Nested message and enum types in `ExecuteQueryParams`.
 pub mod execute_query_params {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Query {
-        #[prost(bytes, tag="1")]
+        #[prost(bytes, tag = "1")]
         LogicalPlan(::prost::alloc::vec::Vec<u8>),
-        #[prost(string, tag="2")]
+        #[prost(string, tag = "2")]
         Sql(::prost::alloc::string::String),
     }
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum OptionalSessionId {
-        #[prost(string, tag="3")]
+        #[prost(string, tag = "3")]
         SessionId(::prost::alloc::string::String),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteSqlParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub sql: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryResult {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub session_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetJobStatusParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulJob {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct QueuedJob {
-}
+pub struct QueuedJob {}
 /// TODO: add progress report
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct RunningJob {
-}
+pub struct RunningJob {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedJob {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub error: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JobStatus {
-    #[prost(oneof="job_status::Status", tags="1, 2, 3, 4")]
+    #[prost(oneof = "job_status::Status", tags = "1, 2, 3, 4")]
     pub status: ::core::option::Option<job_status::Status>,
 }
 /// Nested message and enum types in `JobStatus`.
 pub mod job_status {
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
-        #[prost(message, tag="1")]
+        #[prost(message, tag = "1")]
         Queued(super::QueuedJob),
-        #[prost(message, tag="2")]
+        #[prost(message, tag = "2")]
         Running(super::RunningJob),
-        #[prost(message, tag="3")]
+        #[prost(message, tag = "3")]
         Failed(super::FailedJob),
-        #[prost(message, tag="4")]
+        #[prost(message, tag = "4")]
         Successful(super::SuccessfulJob),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetJobStatusResult {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub status: ::core::option::Option<JobStatus>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetFileMetadataParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub path: ::prost::alloc::string::String,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub file_type: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetFileMetadataResult {
-    #[prost(message, optional, tag="1")]
+    #[prost(message, optional, tag = "1")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FilePartitionMetadata {
-    #[prost(string, repeated, tag="1")]
+    #[prost(string, repeated, tag = "1")]
     pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelJobParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelJobResult {
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub cancelled: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CleanJobDataParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct CleanJobDataResult {
-}
+pub struct CleanJobDataResult {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchTaskParams {
     /// Allow to launch a task set to an executor at once
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub scheduler_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchMultiTaskParams {
     /// Allow to launch a task set to an executor at once
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub multi_tasks: ::prost::alloc::vec::Vec<MultiTaskDefinition>,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub scheduler_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchTaskResult {
     /// TODO when part of the task set are scheduled successfully
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub success: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchMultiTaskResult {
     /// TODO when part of the task set are scheduled successfully
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub success: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelTasksParams {
-    #[prost(message, repeated, tag="1")]
+    #[prost(message, repeated, tag = "1")]
     pub task_infos: ::prost::alloc::vec::Vec<RunningTaskInfo>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelTasksResult {
-    #[prost(bool, tag="1")]
+    #[prost(bool, tag = "1")]
     pub cancelled: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RemoveJobDataParams {
-    #[prost(string, tag="1")]
+    #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
-pub struct RemoveJobDataResult {
-}
+pub struct RemoveJobDataResult {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningTaskInfo {
-    #[prost(uint32, tag="1")]
+    #[prost(uint32, tag = "1")]
     pub task_id: u32,
-    #[prost(string, tag="2")]
+    #[prost(string, tag = "2")]
     pub job_id: ::prost::alloc::string::String,
-    #[prost(uint32, tag="3")]
+    #[prost(uint32, tag = "3")]
     pub stage_id: u32,
-    #[prost(uint32, tag="4")]
+    #[prost(uint32, tag = "4")]
     pub partition_id: u32,
 }
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
@@ -1486,6 +1497,7 @@ pub struct RunningTaskInfo {
 pub enum PartitionMode {
     CollectLeft = 0,
     Partitioned = 1,
+    Auto = 2,
 }
 impl PartitionMode {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -1496,6 +1508,7 @@ impl PartitionMode {
         match self {
             PartitionMode::CollectLeft => "COLLECT_LEFT",
             PartitionMode::Partitioned => "PARTITIONED",
+            PartitionMode::Auto => "AUTO",
         }
     }
 }
@@ -1972,7 +1985,7 @@ pub mod executor_grpc_client {
 pub mod scheduler_grpc_server {
     #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
     use tonic::codegen::*;
-    ///Generated trait containing gRPC methods that should be implemented for 
use with SchedulerGrpcServer.
+    /// Generated trait containing gRPC methods that should be implemented for 
use with SchedulerGrpcServer.
     #[async_trait]
     pub trait SchedulerGrpc: Send + Sync + 'static {
         /// Executors must poll the scheduler for heartbeat and to receive 
tasks
@@ -2518,7 +2531,7 @@ pub mod scheduler_grpc_server {
 pub mod executor_grpc_server {
     #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
     use tonic::codegen::*;
-    ///Generated trait containing gRPC methods that should be implemented for 
use with ExecutorGrpcServer.
+    /// Generated trait containing gRPC methods that should be implemented for 
use with ExecutorGrpcServer.
     #[async_trait]
     pub trait ExecutorGrpc: Send + Sync + 'static {
         async fn launch_task(
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 04043822..b1b2ab38 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -392,8 +392,8 @@ mod tests {
             None
         }
 
-        fn required_child_distribution(&self) -> Distribution {
-            Distribution::SinglePartition
+        fn required_input_distribution(&self) -> Vec<Distribution> {
+            vec![Distribution::SinglePartition]
         }
 
         fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/ballista/core/src/serde/physical_plan/from_proto.rs 
b/ballista/core/src/serde/physical_plan/from_proto.rs
index 693037bd..eb951c76 100644
--- a/ballista/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/core/src/serde/physical_plan/from_proto.rs
@@ -411,6 +411,8 @@ impl TryInto<FileScanConfig> for 
&protobuf::FileScanExecConf {
             projection,
             limit: self.limit.as_ref().map(|sl| sl.limit as usize),
             table_partition_cols: vec![],
+            // TODO add ordering info to the ballista proto file
+            output_ordering: None,
         })
     }
 }
diff --git a/ballista/core/src/serde/physical_plan/mod.rs 
b/ballista/core/src/serde/physical_plan/mod.rs
index 5860f570..cf7e5497 100644
--- a/ballista/core/src/serde/physical_plan/mod.rs
+++ b/ballista/core/src/serde/physical_plan/mod.rs
@@ -331,6 +331,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     physical_window_expr,
                     input,
                     Arc::new((&input_schema).try_into()?),
+                    vec![],
+                    None,
                 )?))
             }
             PhysicalPlanType::Aggregate(hash_agg) => {
@@ -528,6 +530,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 let partition_mode = match partition_mode {
                     protobuf::PartitionMode::CollectLeft => 
PartitionMode::CollectLeft,
                     protobuf::PartitionMode::Partitioned => 
PartitionMode::Partitioned,
+                    protobuf::PartitionMode::Auto => PartitionMode::Auto,
                 };
                 Ok(Arc::new(HashJoinExec::try_new(
                     left,
@@ -792,7 +795,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         input: Some(Box::new(input)),
                         skip: limit.skip() as u32,
                         fetch: match limit.fetch() {
-                            Some(n) => *n as i64,
+                            Some(n) => n as i64,
                             _ => -1, // no limit
                         },
                     },
@@ -867,6 +870,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
             let partition_mode = match exec.partition_mode() {
                 PartitionMode::CollectLeft => 
protobuf::PartitionMode::CollectLeft,
                 PartitionMode::Partitioned => 
protobuf::PartitionMode::Partitioned,
+                PartitionMode::Auto => protobuf::PartitionMode::Auto,
             };
 
             Ok(protobuf::PhysicalPlanNode {
@@ -1272,6 +1276,7 @@ fn decode_scan_config(
         projection,
         limit: proto.limit.as_ref().map(|sl| sl.limit as usize),
         table_partition_cols: vec![],
+        output_ordering: None,
     })
 }
 
@@ -1599,6 +1604,7 @@ mod roundtrip_tests {
             projection: None,
             limit: None,
             table_partition_cols: vec![],
+            output_ordering: None,
         };
 
         let predicate = 
datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
diff --git a/ballista/core/src/serde/physical_plan/to_proto.rs 
b/ballista/core/src/serde/physical_plan/to_proto.rs
index fbf8e5e7..c303ba45 100644
--- a/ballista/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/core/src/serde/physical_plan/to_proto.rs
@@ -457,7 +457,11 @@ impl TryFrom<&FileScanConfig> for 
protobuf::FileScanExecConf {
                 .map(|n| *n as u32)
                 .collect(),
             schema: Some(conf.file_schema.as_ref().try_into()?),
-            table_partition_cols: conf.table_partition_cols.to_vec(),
+            table_partition_cols: conf
+                .table_partition_cols
+                .iter()
+                .map(|col| col.0.to_owned())
+                .collect(),
             object_store_url: conf.object_store_url.to_string(),
         })
     }
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index bde421a8..d43a95b2 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -35,15 +35,15 @@ default = ["mimalloc"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "26.0.0" }
-arrow-flight = { version = "26.0.0" }
+arrow = { version = "28.0.0" }
+arrow-flight = { version = "28.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.10.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 69242f53..885dd2ff 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -39,7 +39,7 @@ sled = ["sled_package", "tokio-stream"]
 
 [dependencies]
 anyhow = "1"
-arrow-flight = { version = "26.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "28.0.0", features = ["flight-sql-experimental"] }
 async-recursion = "1.0.0"
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.10.0" }
@@ -47,8 +47,8 @@ base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
 etcd-client = { version = "0.10", optional = true }
 flatbuffers = { version = "22.9.29" }
 futures = "0.3"
diff --git a/ballista/scheduler/src/display.rs 
b/ballista/scheduler/src/display.rs
index 7df23c22..6f1de120 100644
--- a/ballista/scheduler/src/display.rs
+++ b/ballista/scheduler/src/display.rs
@@ -131,7 +131,7 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> 
{
         plan.fmt_as(self.t, self.f)?;
         if let Some(metrics) = self.metrics.get(self.metric_index) {
             let metrics = metrics
-                .aggregate_by_partition()
+                .aggregate_by_name()
                 .sorted_for_display()
                 .timestamps_removed();
             write!(self.f, ", metrics=[{}]", metrics)?;
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index e61eb93c..979b65f7 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -44,6 +44,7 @@ use std::ops::Deref;
 use std::sync::Arc;
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use datafusion::prelude::SessionConfig;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
@@ -292,9 +293,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
         // TODO shouldn't this take a ListingOption object as input?
 
         let GetFileMetadataParams { path, file_type } = request.into_inner();
-
+        // Here, we use the default config, since we don't know the session id
+        let config = SessionConfig::default().config_options();
         let file_format: Arc<dyn FileFormat> = match file_type.as_str() {
-            "parquet" => Ok(Arc::new(ParquetFormat::default())),
+            "parquet" => Ok(Arc::new(ParquetFormat::new(config))),
             // TODO implement for CSV
             _ => Err(tonic::Status::unimplemented(
                 "get_file_metadata unsupported file type",
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 51a6232f..6f9b6545 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1691,13 +1691,15 @@ mod test {
         let executor2 = mock_executor("executor-id2".to_string());
         let mut join_graph = test_join_plan(4).await;
 
-        assert_eq!(join_graph.stage_count(), 5);
+        // With the improvement of 
https://github.com/apache/arrow-datafusion/pull/4122,
+        // unnecessary RepartitionExec can be removed
+        assert_eq!(join_graph.stage_count(), 4);
         assert_eq!(join_graph.available_tasks(), 0);
 
         // Call revive to move the two leaf Resolved stages to Running
         join_graph.revive();
 
-        assert_eq!(join_graph.stage_count(), 5);
+        assert_eq!(join_graph.stage_count(), 4);
         assert_eq!(join_graph.available_tasks(), 2);
 
         // Complete the first stage
@@ -1742,13 +1744,13 @@ mod test {
         let executor2 = mock_executor("executor-id2".to_string());
         let mut join_graph = test_join_plan(4).await;
 
-        assert_eq!(join_graph.stage_count(), 5);
+        assert_eq!(join_graph.stage_count(), 4);
         assert_eq!(join_graph.available_tasks(), 0);
 
         // Call revive to move the two leaf Resolved stages to Running
         join_graph.revive();
 
-        assert_eq!(join_graph.stage_count(), 5);
+        assert_eq!(join_graph.stage_count(), 4);
         assert_eq!(join_graph.available_tasks(), 2);
 
         // Complete the first stage
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index c46c0a91..b187d8e9 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,7 +22,7 @@ use std::iter::FromIterator;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
-use 
datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
+use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
@@ -363,7 +363,7 @@ impl UnresolvedStage {
         )?;
 
         // Optimize join order based on new resolved statistics
-        let optimize_join = HashBuildProbeOrder::new();
+        let optimize_join = JoinSelection::new();
         let plan = optimize_join.optimize(plan, &SessionConfig::new())?;
 
         Ok(ResolvedStage::new(
@@ -829,7 +829,7 @@ impl RunningStage {
             let new_metric = Arc::new(Metric::new(metric_value, 
Some(partition)));
             first.push(new_metric);
         }
-        first.aggregate_by_partition()
+        first.aggregate_by_name()
     }
 
     pub(super) fn task_failure_number(&self, partition_id: usize) -> usize {
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 708b5077..6e65612f 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -473,10 +473,10 @@ filter_expr="]
        subgraph cluster4 {
                label = "Stage 5 [Unresolved]";
                stage_5_0 [shape=box, label="ShuffleWriter [48 partitions]"]
-               stage_5_0_0 [shape=box, label="Projection: a@0, a@1, a@2"]
+               stage_5_0_0 [shape=box, label="Projection: a@0, b@1, a@2, b@3, 
a@4, b@5"]
                stage_5_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
                stage_5_0_0_0_0 [shape=box, label="HashJoin
-join_expr=a@1 = a@0
+join_expr=b@3 = b@1
 filter_expr="]
                stage_5_0_0_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
                stage_5_0_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec 
[stage_id=3]"]
@@ -528,7 +528,132 @@ filter_expr="]
         Ok(())
     }
 
+    #[tokio::test]
+    async fn dot_optimized() -> Result<()> {
+        let graph = test_graph_optimized().await?;
+        let dot = ExecutionGraphDot::generate(Arc::new(graph))
+            .map_err(|e| BallistaError::Internal(format!("{:?}", e)))?;
+
+        let expected = r#"digraph G {
+       subgraph cluster0 {
+               label = "Stage 1 [Resolved]";
+               stage_1_0 [shape=box, label="ShuffleWriter [0 partitions]"]
+               stage_1_0_0 [shape=box, label="MemoryExec"]
+               stage_1_0_0 -> stage_1_0
+       }
+       subgraph cluster1 {
+               label = "Stage 2 [Resolved]";
+               stage_2_0 [shape=box, label="ShuffleWriter [0 partitions]"]
+               stage_2_0_0 [shape=box, label="MemoryExec"]
+               stage_2_0_0 -> stage_2_0
+       }
+       subgraph cluster2 {
+               label = "Stage 3 [Resolved]";
+               stage_3_0 [shape=box, label="ShuffleWriter [0 partitions]"]
+               stage_3_0_0 [shape=box, label="MemoryExec"]
+               stage_3_0_0 -> stage_3_0
+       }
+       subgraph cluster3 {
+               label = "Stage 4 [Unresolved]";
+               stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"]
+               stage_4_0_0 [shape=box, label="Projection: a@0, a@1, a@2"]
+               stage_4_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@1 = a@0
+filter_expr="]
+               stage_4_0_0_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@0 = a@0
+filter_expr="]
+               stage_4_0_0_0_0_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_0_0_0_0 [shape=box, 
label="UnresolvedShuffleExec [stage_id=1]"]
+               stage_4_0_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0_0
+               stage_4_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0
+               stage_4_0_0_0_0_0_0_1 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_0_0_1_0 [shape=box, 
label="UnresolvedShuffleExec [stage_id=2]"]
+               stage_4_0_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_0_1
+               stage_4_0_0_0_0_0_0_1 -> stage_4_0_0_0_0_0_0
+               stage_4_0_0_0_0_0_0 -> stage_4_0_0_0_0_0
+               stage_4_0_0_0_0_0 -> stage_4_0_0_0_0
+               stage_4_0_0_0_0_1 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec 
[stage_id=3]"]
+               stage_4_0_0_0_0_1_0 -> stage_4_0_0_0_0_1
+               stage_4_0_0_0_0_1 -> stage_4_0_0_0_0
+               stage_4_0_0_0_0 -> stage_4_0_0_0
+               stage_4_0_0_0 -> stage_4_0_0
+               stage_4_0_0 -> stage_4_0
+       }
+       stage_1_0 -> stage_4_0_0_0_0_0_0_0_0
+       stage_2_0 -> stage_4_0_0_0_0_0_0_1_0
+       stage_3_0 -> stage_4_0_0_0_0_1_0
+}
+"#;
+        assert_eq!(expected, &dot);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn query_stage_optimized() -> Result<()> {
+        let graph = test_graph_optimized().await?;
+        let dot = ExecutionGraphDot::generate_for_query_stage(Arc::new(graph), 
4)
+            .map_err(|e| BallistaError::Internal(format!("{:?}", e)))?;
+
+        let expected = r#"digraph G {
+               stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"]
+               stage_4_0_0 [shape=box, label="Projection: a@0, a@1, a@2"]
+               stage_4_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@1 = a@0
+filter_expr="]
+               stage_4_0_0_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@0 = a@0
+filter_expr="]
+               stage_4_0_0_0_0_0_0_0 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_0_0_0_0 [shape=box, 
label="UnresolvedShuffleExec [stage_id=1]"]
+               stage_4_0_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0_0
+               stage_4_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0
+               stage_4_0_0_0_0_0_0_1 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_0_0_1_0 [shape=box, 
label="UnresolvedShuffleExec [stage_id=2]"]
+               stage_4_0_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_0_1
+               stage_4_0_0_0_0_0_0_1 -> stage_4_0_0_0_0_0_0
+               stage_4_0_0_0_0_0_0 -> stage_4_0_0_0_0_0
+               stage_4_0_0_0_0_0 -> stage_4_0_0_0_0
+               stage_4_0_0_0_0_1 [shape=box, label="CoalesceBatches 
[batchSize=4096]"]
+               stage_4_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec 
[stage_id=3]"]
+               stage_4_0_0_0_0_1_0 -> stage_4_0_0_0_0_1
+               stage_4_0_0_0_0_1 -> stage_4_0_0_0_0
+               stage_4_0_0_0_0 -> stage_4_0_0_0
+               stage_4_0_0_0 -> stage_4_0_0
+               stage_4_0_0 -> stage_4_0
+}
+"#;
+        assert_eq!(expected, &dot);
+        Ok(())
+    }
+
     async fn test_graph() -> Result<ExecutionGraph> {
+        let ctx =
+            
SessionContext::with_config(SessionConfig::new().with_target_partitions(48));
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::UInt32, false),
+            Field::new("b", DataType::UInt32, false),
+        ]));
+        let table = Arc::new(MemTable::try_new(schema.clone(), vec![])?);
+        ctx.register_table("foo", table.clone())?;
+        ctx.register_table("bar", table.clone())?;
+        ctx.register_table("baz", table)?;
+        let df = ctx
+            .sql("SELECT * FROM foo JOIN bar ON foo.a = bar.a JOIN baz on 
bar.b = baz.b")
+            .await?;
+        let plan = df.to_logical_plan()?;
+        let plan = ctx.create_physical_plan(&plan).await?;
+        ExecutionGraph::new("scheduler_id", "job_id", "job_name", 
"session_id", plan, 0)
+    }
+
+    // With the improvement of 
https://github.com/apache/arrow-datafusion/pull/4122,
+    // Redundant RepartitionExec can be removed so that the stage number will 
be reduced
+    async fn test_graph_optimized() -> Result<ExecutionGraph> {
         let ctx =
             
SessionContext::with_config(SessionConfig::new().with_target_partitions(48));
         let schema =
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index b4526fd1..ae386382 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -81,7 +81,7 @@ impl TableProvider for ExplodingTableProvider {
     async fn scan(
         &self,
         _ctx: &SessionState,
-        _projection: &Option<Vec<usize>>,
+        _projection: Option<&Vec<usize>>,
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 5752359a..8754f192 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index fd18a48f..10ff91ad 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -816,7 +816,8 @@ async fn get_table(
             }
             "parquet" => {
                 let path = format!("{}/{}", path, table);
-                let format = 
ParquetFormat::default().with_enable_pruning(true);
+                let format = ParquetFormat::new(ctx.config_options())
+                    .with_enable_pruning(Some(true));
 
                 (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
             }
@@ -832,6 +833,7 @@ async fn get_table(
         target_partitions,
         collect_stat: true,
         table_partition_cols: vec![],
+        file_sort_order: None,
     };
 
     let url = ListingTableUrl::parse(path)?;
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 7d24401c..06aa536d 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "14.0.0"
+datafusion = "15.0.0"
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 2f02f175..5879d744 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
 [dependencies]
 async-trait = "0.1"
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = { version = "14.0.0", features = ["pyarrow"] }
+datafusion = { version = "15.0.0", features = ["pyarrow"] }
 futures = "0.3"
 mimalloc = { version = "*", optional = true, default-features = false }
 pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", 
"abi3-py37"] }
diff --git a/python/src/context.rs b/python/src/context.rs
index 38a3cb9b..26c5661a 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -23,7 +23,7 @@ use uuid::Uuid;
 use pyo3::exceptions::{PyKeyError, PyValueError};
 use pyo3::prelude::*;
 
-use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::datatypes::{DataType, Schema};
 use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::datasource::datasource::TableProvider;
@@ -34,6 +34,7 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
 use crate::catalog::{PyCatalog, PyTable};
 use crate::dataframe::PyDataFrame;
 use crate::dataset::Dataset;
+use crate::datatype::PyDataType;
 use crate::errors::DataFusionError;
 use crate::udf::PyScalarUDF;
 use crate::utils::wait_for_future;
@@ -159,13 +160,13 @@ impl PySessionContext {
         &mut self,
         name: &str,
         path: &str,
-        table_partition_cols: Vec<String>,
+        table_partition_cols: Vec<(String, PyDataType)>,
         parquet_pruning: bool,
         file_extension: &str,
         py: Python,
     ) -> PyResult<()> {
         let mut options = ParquetReadOptions::default()
-            .table_partition_cols(table_partition_cols)
+            
.table_partition_cols(convert_table_partition_cols(table_partition_cols))
             .parquet_pruning(parquet_pruning);
         options.file_extension = file_extension;
         let result = self.ctx.register_parquet(name, path, options);
@@ -255,3 +256,12 @@ impl PySessionContext {
         Ok(PyDataFrame::new(self.ctx.read_empty()?))
     }
 }
+
+fn convert_table_partition_cols(
+    table_partition_cols: Vec<(String, PyDataType)>,
+) -> Vec<(String, DataType)> {
+    table_partition_cols
+        .iter()
+        .map(|(name, t)| (name.clone(), t.data_type.clone()))
+        .collect()
+}
\ No newline at end of file
diff --git a/python/src/dataset.rs b/python/src/dataset.rs
index d34d974f..f0b2b10e 100644
--- a/python/src/dataset.rs
+++ b/python/src/dataset.rs
@@ -98,7 +98,7 @@ impl TableProvider for Dataset {
     async fn scan(
         &self,
         _ctx: &SessionState,
-        projection: &Option<Vec<usize>>,
+        projection: Option<&Vec<usize>>,
         filters: &[Expr],
         // limit can be used to reduce the amount scanned
         // from the datasource as a performance optimization.
@@ -111,7 +111,7 @@ impl TableProvider for Dataset {
                 DatasetExec::new(
                     py,
                     self.dataset.as_ref(py),
-                    projection.clone(),
+                    projection.cloned(),
                     filters,
                 )
                 .map_err(|err| DataFusionError::External(Box::new(err)))?,
diff --git a/python/src/datatype.rs b/python/src/datatype.rs
new file mode 100644
index 00000000..07f6cd23
--- /dev/null
+++ b/python/src/datatype.rs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Copied from https://github.com/apache/arrow-datafusion-python/pull/103
+
+use datafusion::arrow::datatypes::DataType;
+use pyo3::pyclass;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+#[pyclass(name = "PyDataType", module = "datafusion", subclass)]
+pub struct PyDataType {
+    pub(crate) data_type: DataType,
+}
+
+impl From<PyDataType> for DataType {
+    fn from(data_type: PyDataType) -> DataType {
+        data_type.data_type
+    }
+}
+
+impl From<DataType> for PyDataType {
+    fn from(data_type: DataType) -> PyDataType {
+        PyDataType { data_type }
+    }
+}
\ No newline at end of file
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 01158d55..106978a7 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -29,6 +29,8 @@ mod context;
 mod dataframe;
 mod dataset;
 mod dataset_exec;
+#[allow(clippy::borrow_deref_ref)]
+mod datatype;
 pub mod errors;
 #[allow(clippy::borrow_deref_ref)]
 mod expression;
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index f2973476..42c388ed 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -95,6 +95,10 @@ impl Accumulator for RustAccumulator {
             Ok(())
         })
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
 }
 
 pub fn to_rust_accumulator(accum: PyObject) -> 
AccumulatorFunctionImplementation {

Reply via email to