This is an automated email from the ASF dual-hosted git repository.

mete pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c4a38c5 update to datafusion 42 ... (#1080)
1c4a38c5 is described below

commit 1c4a38c5383f75f7f3ab41ff7e6ce0186a9bd87a
Author: Marko Milenković <[email protected]>
AuthorDate: Mon Oct 14 22:55:25 2024 +0100

    update to datafusion 42 ... (#1080)
    
    ... and all other dependant libraries
---
 Cargo.toml                                         |  26 +-
 ballista-cli/Cargo.toml                            |   5 +-
 ballista-cli/src/command.rs                        |   4 +-
 ballista-cli/src/main.rs                           |   4 +-
 ballista/core/build.rs                             |   2 +-
 ballista/core/proto/datafusion.proto               | 176 +++++-----
 ballista/core/proto/datafusion_common.proto        |  41 ++-
 .../core/src/execution_plans/shuffle_reader.rs     |   1 +
 ballista/core/src/serde/generated/ballista.rs      | 391 +++++++--------------
 ballista/core/src/serde/mod.rs                     |   2 +-
 ballista/core/src/serde/scheduler/to_proto.rs      |   2 +-
 ballista/executor/src/flight_service.rs            |   3 +-
 ballista/scheduler/Cargo.toml                      |   4 +-
 ballista/scheduler/build.rs                        |   2 +-
 ballista/scheduler/src/flight_sql.rs               |  14 +-
 ballista/scheduler/src/scheduler_process.rs        |   8 +-
 ballista/scheduler/src/state/session_manager.rs    |   2 +-
 ballista/scheduler/src/test_utils.rs               |   2 +-
 python/Cargo.toml                                  |   8 +-
 19 files changed, 306 insertions(+), 391 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index e77e5468..007482bf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,23 +21,23 @@ members = ["ballista-cli", "ballista/client", 
"ballista/core", "ballista/executo
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "52.2.0", features = ["ipc_compression"] }
-arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "52.2.0", default-features = false }
+arrow = { version = "53", features = ["ipc_compression"] }
+arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "53", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
 # bump directly to datafusion v43 to avoid the serde bug on v42 
(https://github.com/apache/datafusion/pull/12626)
-datafusion = "41.0.0"
-datafusion-cli = "41.0.0"
-datafusion-proto = "41.0.0"
-datafusion-proto-common = "41.0.0"
-object_store = "0.10.2"
-prost = "0.12.0"
-prost-types = "0.12.0"
-sqlparser = "0.49.0"
-tonic = { version = "0.11.0" }
-tonic-build = { version = "0.11.0", default-features = false, features = [
+datafusion = "42.0.0"
+datafusion-cli = "42.0.0"
+datafusion-proto = "42.0.0"
+datafusion-proto-common = "42.0.0"
+object_store = "0.11"
+prost = "0.13"
+prost-types = "0.13"
+sqlparser = "0.50"
+tonic = { version = "0.12" }
+tonic-build = { version = "0.12", default-features = false, features = [
     "transport",
     "prost"
 ] }
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 16ff0b54..891f5a7c 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -30,14 +30,15 @@ readme = "README.md"
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.12.0", features = 
["standalone"] }
-clap = { workspace = true }
+# datafusion-cli uses 4.5 clap, thus it does not depend on workspace
+clap = { version = "4.5", features = ["derive", "cargo"] }
 datafusion = { workspace = true }
 datafusion-cli = { workspace = true }
 dirs = "5.0.1"
 env_logger = { workspace = true }
 mimalloc = { version = "0.1", default-features = false }
 num_cpus = { workspace = true }
-rustyline = "11.0.0"
+rustyline = "14.0.0"
 tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", 
"sync", "parking_lot"] }
 
 [features]
diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs
index 0c407cf0..2123713a 100644
--- a/ballista-cli/src/command.rs
+++ b/ballista-cli/src/command.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use std::time::Instant;
 
 use ballista::prelude::{BallistaContext, BallistaError, Result};
-use clap::ArgEnum;
+
 use datafusion::arrow::array::{ArrayRef, StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::arrow::record_batch::RecordBatch;
@@ -223,7 +223,7 @@ impl OutputFormat {
                     Err(BallistaError::General(format!(
                         "{:?} is not a valid format type [possible values: 
{:?}]",
                         format,
-                        PrintFormat::value_variants()
+                        "TO BE FIXED", //PrintFormat::value_variants()
                     )))
                 }
             }
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 6aeecd6c..0fd6ddfd 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -58,7 +58,7 @@ struct Args {
     #[clap(
         short,
         long,
-        multiple_values = true,
+        num_args = 0..,
         help = "Execute commands from file(s), then exit",
         value_parser(parse_valid_file)
     )]
@@ -67,7 +67,7 @@ struct Args {
     #[clap(
         short = 'r',
         long,
-        multiple_values = true,
+        num_args = 0..,
         help = "Run the provided files on startup instead of ~/.ballistarc",
         value_parser(parse_valid_file),
         conflicts_with = "file"
diff --git a/ballista/core/build.rs b/ballista/core/build.rs
index 6e501b88..4fe7e3bf 100644
--- a/ballista/core/build.rs
+++ b/ballista/core/build.rs
@@ -44,7 +44,7 @@ fn main() -> Result<(), String> {
             .extern_path(".datafusion_common", "::datafusion_proto_common")
             .extern_path(".datafusion", "::datafusion_proto::protobuf")
             .protoc_arg("--experimental_allow_proto3_optional")
-            .compile(&["proto/ballista.proto"], &["proto"])
+            .compile_protos(&["proto/ballista.proto"], &["proto"])
             .map_err(|e| format!("protobuf compilation failed: {e}"))?;
         let generated_source_path = out.join("ballista.protobuf.rs");
         let code = std::fs::read_to_string(generated_source_path).unwrap();
diff --git a/ballista/core/proto/datafusion.proto 
b/ballista/core/proto/datafusion.proto
index 8402b92f..cf166ba9 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -75,6 +75,10 @@ message LogicalExprNodeCollection {
   repeated LogicalExprNode logical_expr_nodes = 1;
 }
 
+message SortExprNodeCollection {
+  repeated SortExprNode sort_expr_nodes = 1;
+}
+
 message ListingTableScanNode {
   reserved 1; // was string table_name
   TableReference table_name = 14;
@@ -90,8 +94,9 @@ message ListingTableScanNode {
     datafusion_common.CsvFormat csv = 10;
     datafusion_common.ParquetFormat parquet = 11;
     datafusion_common.AvroFormat avro = 12;
+    datafusion_common.NdJsonFormat json = 15;
   }
-  repeated LogicalExprNodeCollection file_sort_order = 13;
+  repeated SortExprNodeCollection file_sort_order = 13;
 }
 
 message ViewTableScanNode {
@@ -128,7 +133,7 @@ message SelectionNode {
 
 message SortNode {
   LogicalPlanNode input = 1;
-  repeated LogicalExprNode expr = 2;
+  repeated SortExprNode expr = 2;
   // Maximum number of highest/lowest rows to fetch; negative means no limit
   int64 fetch = 3;
 }
@@ -159,12 +164,12 @@ message CreateExternalTableNode {
   repeated string table_partition_cols = 5;
   bool if_not_exists = 6;
   string definition = 7;
-  repeated LogicalExprNodeCollection order_exprs = 10;
+  repeated SortExprNodeCollection order_exprs = 10;
   bool unbounded = 11;
   map<string, string> options = 8;
   datafusion_common.Constraints constraints = 12;
   map<string, LogicalExprNode> column_defaults = 13;
- }
+}
 
 message PrepareNode {
   string name = 1;
@@ -244,35 +249,51 @@ message DistinctNode {
 message DistinctOnNode {
   repeated LogicalExprNode on_expr = 1;
   repeated LogicalExprNode select_expr = 2;
-  repeated LogicalExprNode sort_expr = 3;
+  repeated SortExprNode sort_expr = 3;
   LogicalPlanNode input = 4;
 }
 
 message CopyToNode {
-    LogicalPlanNode input = 1;
-    string output_url = 2;
-    oneof format_options {
-      datafusion_common.CsvOptions csv = 8;
-      datafusion_common.JsonOptions json = 9;
-      datafusion_common.TableParquetOptions parquet = 10;
-      datafusion_common.AvroOptions avro = 11;
-      datafusion_common.ArrowOptions arrow = 12;
-    }
-    repeated string partition_by = 7;
+  LogicalPlanNode input = 1;
+  string output_url = 2;
+  bytes file_type = 3;
+  repeated string partition_by = 7;
 }
 
 message UnnestNode {
-    LogicalPlanNode input = 1;
-    repeated datafusion_common.Column exec_columns = 2;
-    repeated uint64 list_type_columns = 3;
-    repeated uint64 struct_type_columns = 4;
-    repeated uint64 dependency_indices = 5;
-    datafusion_common.DfSchema schema = 6;
-    UnnestOptions options = 7;
+  LogicalPlanNode input = 1;
+  repeated ColumnUnnestExec exec_columns = 2;
+  repeated ColumnUnnestListItem list_type_columns = 3;
+  repeated uint64 struct_type_columns = 4;
+  repeated uint64 dependency_indices = 5;
+  datafusion_common.DfSchema schema = 6;
+  UnnestOptions options = 7;
+}
+message ColumnUnnestListItem {
+  uint32 input_index = 1;
+  ColumnUnnestListRecursion recursion = 2;
+}
+
+message ColumnUnnestListRecursions {
+  repeated ColumnUnnestListRecursion recursions = 2;
+}
+
+message ColumnUnnestListRecursion {
+  datafusion_common.Column output_column = 1;
+  uint32 depth = 2;
+}
+
+message ColumnUnnestExec {
+  datafusion_common.Column column = 1;
+  oneof UnnestType {
+    ColumnUnnestListRecursions list = 2;
+    datafusion_common.EmptyMessage struct = 3;
+    datafusion_common.EmptyMessage inferred = 4;
+  }
 }
 
 message UnnestOptions {
-    bool preserve_nulls = 1;
+  bool preserve_nulls = 1;
 }
 
 message UnionNode {
@@ -316,8 +337,6 @@ message LogicalExprNode {
     // binary expressions
     BinaryExprNode binary_expr = 4;
 
-    // aggregate expressions
-    AggregateExprNode aggregate_expr = 5;
 
     // null checks
     IsNull is_null_expr = 6;
@@ -327,7 +346,6 @@ message LogicalExprNode {
     BetweenNode between = 9;
     CaseNode case_ = 10;
     CastNode cast = 11;
-    SortExprNode sort = 12;
     NegativeNode negative = 13;
     InListNode in_list = 14;
     Wildcard wildcard = 15;
@@ -369,7 +387,7 @@ message LogicalExprNode {
 }
 
 message Wildcard {
-  string qualifier = 1;
+  TableReference qualifier = 1;
 }
 
 message PlaceholderNode {
@@ -471,57 +489,14 @@ message InListNode {
   bool negated = 3;
 }
 
-enum AggregateFunction {
-  MIN = 0;
-  MAX = 1;
-  SUM = 2;
-  AVG = 3;
-  COUNT = 4;
-  APPROX_DISTINCT = 5;
-  ARRAY_AGG = 6;
-  // VARIANCE = 7;
-  VARIANCE_POP = 8;
-  // COVARIANCE = 9;
-  // COVARIANCE_POP = 10;
-  STDDEV = 11;
-  STDDEV_POP = 12;
-  CORRELATION = 13;
-  APPROX_PERCENTILE_CONT = 14;
-  APPROX_MEDIAN = 15;
-  APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
-  GROUPING = 17;
-  // MEDIAN = 18;
-  BIT_AND = 19;
-  BIT_OR = 20;
-  BIT_XOR = 21;
-  BOOL_AND = 22;
-  BOOL_OR = 23;
-  REGR_SLOPE = 26;
-  REGR_INTERCEPT = 27;
-  REGR_COUNT = 28;
-  REGR_R2 = 29;
-  REGR_AVGX = 30;
-  REGR_AVGY = 31;
-  REGR_SXX = 32;
-  REGR_SYY = 33;
-  REGR_SXY = 34;
-  STRING_AGG = 35;
-  NTH_VALUE_AGG = 36;
-}
-
-message AggregateExprNode {
-  AggregateFunction aggr_function = 1;
-  repeated LogicalExprNode expr = 2;
-  bool distinct = 3;
-  LogicalExprNode filter = 4;
-  repeated LogicalExprNode order_by = 5;
-}
 
 message AggregateUDFExprNode {
   string fun_name = 1;
   repeated LogicalExprNode args = 2;
+  bool distinct = 5;
   LogicalExprNode filter = 3;
-  repeated LogicalExprNode order_by = 4;
+  repeated SortExprNode order_by = 4;
+  optional bytes fun_definition = 6;
 }
 
 message ScalarUDFExprNode {
@@ -531,7 +506,8 @@ message ScalarUDFExprNode {
 }
 
 enum BuiltInWindowFunction {
-  ROW_NUMBER = 0;
+  UNSPECIFIED = 0; // 
https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum
+  // ROW_NUMBER = 0;
   RANK = 1;
   DENSE_RANK = 2;
   PERCENT_RANK = 3;
@@ -546,16 +522,16 @@ enum BuiltInWindowFunction {
 
 message WindowExprNode {
   oneof window_function {
-    AggregateFunction aggr_function = 1;
     BuiltInWindowFunction built_in_function = 2;
     string udaf = 3;
     string udwf = 9;
   }
   LogicalExprNode expr = 4;
   repeated LogicalExprNode partition_by = 5;
-  repeated LogicalExprNode order_by = 6;
+  repeated SortExprNode order_by = 6;
   // repeated LogicalExprNode filter = 7;
   WindowFrame window_frame = 8;
+  optional bytes fun_definition = 10;
 }
 
 message BetweenNode {
@@ -674,9 +650,11 @@ message PlanType {
     datafusion_common.EmptyMessage FinalLogicalPlan = 3;
     datafusion_common.EmptyMessage InitialPhysicalPlan = 4;
     datafusion_common.EmptyMessage InitialPhysicalPlanWithStats = 9;
+    datafusion_common.EmptyMessage InitialPhysicalPlanWithSchema = 11;
     OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
     datafusion_common.EmptyMessage FinalPhysicalPlan = 6;
     datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10;
+    datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12;
   }
 }
 
@@ -737,10 +715,11 @@ message PhysicalPlanNode {
     AnalyzeExecNode analyze = 23;
     JsonSinkExecNode json_sink = 24;
     SymmetricHashJoinExecNode symmetric_hash_join = 25;
-    InterleaveExecNode  interleave = 26;
+    InterleaveExecNode interleave = 26;
     PlaceholderRowExecNode placeholder_row = 27;
     CsvSinkExecNode csv_sink = 28;
     ParquetSinkExecNode parquet_sink = 29;
+    UnnestExecNode unnest = 30;
   }
 }
 
@@ -752,13 +731,21 @@ message PartitionColumn {
 
 message FileSinkConfig {
   reserved 6; // writer_mode
+  reserved 8; // was `overwrite` which has been superseded by `insert_op`
 
   string object_store_url = 1;
   repeated PartitionedFile file_groups = 2;
   repeated string table_paths = 3;
   datafusion_common.Schema output_schema = 4;
   repeated PartitionColumn table_partition_cols = 5;
-  bool overwrite = 8;
+  bool keep_partition_by_columns = 9;
+  InsertOp insert_op = 10; 
+}
+
+enum InsertOp {
+  Append = 0;
+  Overwrite = 1;
+  Replace = 2;
 }
 
 message JsonSink {
@@ -797,6 +784,19 @@ message ParquetSinkExecNode {
   PhysicalSortExprNodeCollection sort_order = 4;
 }
 
+message UnnestExecNode {
+  PhysicalPlanNode input = 1;
+  datafusion_common.Schema schema = 2;
+  repeated ListUnnest list_type_columns = 3;
+  repeated uint64 struct_type_columns = 4;
+  UnnestOptions options = 5;
+}
+
+message ListUnnest {
+  uint32 index_in_input_schema = 1;
+  uint32 depth = 2;
+}
+
 message PhysicalExtensionNode {
   bytes node = 1;
   repeated PhysicalPlanNode inputs = 2;
@@ -838,6 +838,8 @@ message PhysicalExprNode {
     // was PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
 
     PhysicalLikeExprNode like_expr = 18;
+
+    PhysicalExtensionExprNode extension = 19;
   }
 }
 
@@ -850,17 +852,17 @@ message PhysicalScalarUdfNode {
 
 message PhysicalAggregateExprNode {
   oneof AggregateFunction {
-    AggregateFunction aggr_function = 1;
     string user_defined_aggr_function = 4;
   }
   repeated PhysicalExprNode expr = 2;
   repeated PhysicalSortExprNode ordering_req = 5;
   bool distinct = 3;
+  bool ignore_nulls = 6;
+  optional bytes fun_definition = 7;
 }
 
 message PhysicalWindowExprNode {
   oneof window_function {
-    AggregateFunction aggr_function = 1;
     BuiltInWindowFunction built_in_function = 2;
     string user_defined_aggr_function = 3;
   }
@@ -869,6 +871,7 @@ message PhysicalWindowExprNode {
   repeated PhysicalSortExprNode order_by = 6;
   WindowFrame window_frame = 7;
   string name = 8;
+  optional bytes fun_definition = 9;
 }
 
 message PhysicalIsNull {
@@ -944,10 +947,16 @@ message PhysicalNegativeNode {
   PhysicalExprNode expr = 1;
 }
 
+message PhysicalExtensionExprNode {
+  bytes expr = 1;
+  repeated PhysicalExprNode inputs = 2;
+}
+
 message FilterExecNode {
   PhysicalPlanNode input = 1;
   PhysicalExprNode expr = 2;
   uint32 default_filter_selectivity = 3;
+  repeated uint32 projection = 9;
 }
 
 message FileGroup {
@@ -994,6 +1003,10 @@ message CsvScanExecNode {
   oneof optional_escape {
     string escape = 5;
   }
+  oneof optional_comment {
+    string comment = 6;
+  }
+  bool newlines_in_values = 7;
 }
 
 message AvroScanExecNode {
@@ -1174,6 +1187,7 @@ message NestedLoopJoinExecNode {
 message CoalesceBatchesExecNode {
   PhysicalPlanNode input = 1;
   uint32 target_batch_size = 2;
+  optional uint32 fetch = 3;
 }
 
 message CoalescePartitionsExecNode {
@@ -1233,4 +1247,4 @@ message PartitionStats {
   int64 num_batches = 2;
   int64 num_bytes = 3;
   repeated datafusion_common.ColumnStats column_stats = 4;
-}
+}
\ No newline at end of file
diff --git a/ballista/core/proto/datafusion_common.proto 
b/ballista/core/proto/datafusion_common.proto
index d9ec7dbb..c3906abf 100644
--- a/ballista/core/proto/datafusion_common.proto
+++ b/ballista/core/proto/datafusion_common.proto
@@ -51,6 +51,11 @@ message ParquetFormat {
 
 message AvroFormat {}
 
+message NdJsonFormat {
+  JsonOptions options = 1;
+}
+
+
 message PrimaryKeyConstraint{
   repeated uint64 indices = 1;
 }
@@ -130,6 +135,12 @@ message Decimal{
   int32 scale = 4;
 }
 
+message Decimal256Type{
+  reserved 1, 2;
+  uint32 precision = 3;
+  int32 scale = 4;
+}
+
 message List{
   Field field_type = 1;
 }
@@ -164,7 +175,7 @@ message Union{
   repeated int32 type_ids = 3;
 }
 
-// Used for List/FixedSizeList/LargeList/Struct
+// Used for List/FixedSizeList/LargeList/Struct/Map
 message ScalarNestedValue {
   message Dictionary {
     bytes ipc_message = 1;
@@ -248,6 +259,7 @@ message ScalarValue{
     bool   bool_value = 1;
     string utf8_value = 2;
     string large_utf8_value = 3;
+    string utf8_view_value = 23;
     int32  int8_value = 4;
     int32  int16_value = 5;
     int32  int32_value = 6;
@@ -265,6 +277,7 @@ message ScalarValue{
     ScalarNestedValue list_value = 17;
     ScalarNestedValue fixed_size_list_value = 18;
     ScalarNestedValue struct_value = 32;
+    ScalarNestedValue map_value = 41;
 
     Decimal128 decimal128_value = 20;
     Decimal256 decimal256_value = 39;
@@ -281,6 +294,7 @@ message ScalarValue{
     ScalarDictionaryValue dictionary_value = 27;
     bytes binary_value = 28;
     bytes large_binary_value = 29;
+    bytes binary_view_value = 22;
     ScalarTime64Value time64_value = 30;
     IntervalDayTimeValue interval_daytime_value = 25;
     IntervalMonthDayNanoValue interval_month_day_nano = 31;
@@ -318,8 +332,10 @@ message ArrowType{
     EmptyMessage FLOAT32 = 12 ;
     EmptyMessage FLOAT64 = 13 ;
     EmptyMessage UTF8 = 14 ;
+    EmptyMessage UTF8_VIEW = 35;
     EmptyMessage LARGE_UTF8 = 32;
     EmptyMessage BINARY = 15 ;
+    EmptyMessage BINARY_VIEW = 34;
     int32 FIXED_SIZE_BINARY = 16 ;
     EmptyMessage LARGE_BINARY = 31;
     EmptyMessage DATE32 = 17 ;
@@ -330,6 +346,7 @@ message ArrowType{
     TimeUnit TIME64 = 22 ;
     IntervalUnit INTERVAL = 23 ;
     Decimal DECIMAL = 24 ;
+    Decimal256Type DECIMAL256 = 36;
     List LIST = 25;
     List LARGE_LIST = 26;
     FixedSizeList FIXED_SIZE_LIST = 27;
@@ -381,6 +398,12 @@ message CsvWriterOptions {
   string time_format = 7;
   // Optional value to represent null
   string null_value = 8;
+  // Optional quote. Defaults to `b'"'`
+  string quote = 9;
+  // Optional escape. Defaults to `'\\'`
+  string escape = 10;
+  // Optional flag whether to double quotes, instead of escaping. Defaults to 
`true`
+  bool double_quote = 11;
 }
 
 // Options controlling CSV format
@@ -397,6 +420,10 @@ message CsvOptions {
   string timestamp_tz_format = 10; // Optional timestamp with timezone format
   string time_format = 11; // Optional time format
   string null_value = 12; // Optional representation of null value
+  bytes comment = 13; // Optional comment character as a byte
+  bytes double_quote = 14; // Indicates if quotes are doubled
+  bytes newlines_in_values = 15; // Indicates if newlines are supported in 
values
+  bytes terminator = 16; // Optional terminator character as a byte
 }
 
 // Options controlling CSV format
@@ -407,15 +434,16 @@ message JsonOptions {
 
 message TableParquetOptions {
   ParquetOptions global = 1;
-  repeated ColumnSpecificOptions column_specific_options = 2;
+  repeated ParquetColumnSpecificOptions column_specific_options = 2;
+  map<string, string> key_value_metadata = 3;
 }
 
-message ColumnSpecificOptions {
+message ParquetColumnSpecificOptions {
   string column_name = 1;
-  ColumnOptions options = 2;
+  ParquetColumnOptions options = 2;
 }
 
-message ColumnOptions {
+message ParquetColumnOptions {
   oneof bloom_filter_enabled_opt {
     bool bloom_filter_enabled = 1;
   }
@@ -465,6 +493,7 @@ message ParquetOptions {
   uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2
   bool bloom_filter_on_read = 26; // default = true
   bool bloom_filter_on_write = 27; // default = false
+  bool schema_force_view_types = 28; // default = false
 
   oneof metadata_size_hint_opt {
     uint64 metadata_size_hint = 4;
@@ -538,4 +567,4 @@ message ColumnStats {
   Precision max_value = 2;
   Precision null_count = 3;
   Precision distinct_count = 4;
-}
+}
\ No newline at end of file
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 2f856b39..4a2c25b8 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -419,6 +419,7 @@ fn fetch_partition_local_inner(
     let file = File::open(path).map_err(|e| {
         BallistaError::General(format!("Failed to open partition file at 
{path}: {e:?}"))
     })?;
+    let file = BufReader::new(file);
     let reader = StreamReader::try_new(file, None).map_err(|e| {
         BallistaError::General(format!("Failed to new arrow FileReader at 
{path}: {e:?}"))
     })?;
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index c45bdeaf..51a7b80b 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -2,7 +2,6 @@
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
 /// Ballista Physical Plan
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct BallistaPhysicalPlanNode {
     #[prost(oneof = "ballista_physical_plan_node::PhysicalPlanType", tags = 
"1, 2, 3")]
@@ -12,7 +11,6 @@ pub struct BallistaPhysicalPlanNode {
 }
 /// Nested message and enum types in `BallistaPhysicalPlanNode`.
 pub mod ballista_physical_plan_node {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum PhysicalPlanType {
         #[prost(message, tag = "1")]
@@ -23,7 +21,6 @@ pub mod ballista_physical_plan_node {
         UnresolvedShuffle(super::UnresolvedShuffleExecNode),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleWriterExecNode {
     /// TODO it seems redundant to provide job and stage id here since we also 
have them
@@ -39,7 +36,6 @@ pub struct ShuffleWriterExecNode {
         ::datafusion_proto::protobuf::PhysicalHashRepartition,
     >,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnresolvedShuffleExecNode {
     #[prost(uint32, tag = "1")]
@@ -49,7 +45,6 @@ pub struct UnresolvedShuffleExecNode {
     #[prost(uint32, tag = "4")]
     pub output_partition_count: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleReaderExecNode {
     #[prost(message, repeated, tag = "1")]
@@ -60,7 +55,6 @@ pub struct ShuffleReaderExecNode {
     #[prost(uint32, tag = "3")]
     pub stage_id: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleReaderPartition {
     /// each partition of a shuffle read can read data from multiple locations
@@ -70,7 +64,6 @@ pub struct ShuffleReaderPartition {
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
 /// Ballista Scheduling
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutionGraph {
     #[prost(string, tag = "1")]
@@ -100,7 +93,6 @@ pub struct ExecutionGraph {
     #[prost(uint64, tag = "13")]
     pub queued_at: u64,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct StageAttempts {
     #[prost(uint32, tag = "1")]
@@ -108,7 +100,6 @@ pub struct StageAttempts {
     #[prost(uint32, repeated, tag = "2")]
     pub stage_attempt_num: ::prost::alloc::vec::Vec<u32>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutionGraphStage {
     #[prost(oneof = "execution_graph_stage::StageType", tags = "1, 2, 3, 4")]
@@ -116,7 +107,6 @@ pub struct ExecutionGraphStage {
 }
 /// Nested message and enum types in `ExecutionGraphStage`.
 pub mod execution_graph_stage {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum StageType {
         #[prost(message, tag = "1")]
@@ -129,7 +119,6 @@ pub mod execution_graph_stage {
         FailedStage(super::FailedStage),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnResolvedStage {
     #[prost(uint32, tag = "1")]
@@ -147,7 +136,6 @@ pub struct UnResolvedStage {
         ::prost::alloc::string::String,
     >,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ResolvedStage {
     #[prost(uint32, tag = "1")]
@@ -167,7 +155,6 @@ pub struct ResolvedStage {
         ::prost::alloc::string::String,
     >,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulStage {
     #[prost(uint32, tag = "1")]
@@ -187,7 +174,6 @@ pub struct SuccessfulStage {
     #[prost(uint32, tag = "9")]
     pub stage_attempt_num: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedStage {
     #[prost(uint32, tag = "1")]
@@ -207,7 +193,6 @@ pub struct FailedStage {
     #[prost(uint32, tag = "9")]
     pub stage_attempt_num: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskInfo {
     #[prost(uint32, tag = "1")]
@@ -234,7 +219,6 @@ pub struct TaskInfo {
 }
 /// Nested message and enum types in `TaskInfo`.
 pub mod task_info {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(message, tag = "8")]
@@ -245,7 +229,6 @@ pub mod task_info {
         Successful(super::SuccessfulTask),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GraphStageInput {
     #[prost(uint32, tag = "1")]
@@ -255,7 +238,6 @@ pub struct GraphStageInput {
     #[prost(bool, tag = "3")]
     pub complete: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskInputPartitions {
     #[prost(uint32, tag = "1")]
@@ -263,7 +245,6 @@ pub struct TaskInputPartitions {
     #[prost(message, repeated, tag = "2")]
     pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct KeyValuePair {
     #[prost(string, tag = "1")]
@@ -271,7 +252,6 @@ pub struct KeyValuePair {
     #[prost(string, tag = "2")]
     pub value: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct Action {
     /// configuration settings
@@ -282,7 +262,6 @@ pub struct Action {
 }
 /// Nested message and enum types in `Action`.
 pub mod action {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum ActionType {
         /// Fetch a partition from an executor
@@ -290,7 +269,6 @@ pub mod action {
         FetchPartition(super::FetchPartition),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutePartition {
     #[prost(string, tag = "1")]
@@ -310,7 +288,6 @@ pub struct ExecutePartition {
         ::datafusion_proto::protobuf::PhysicalHashRepartition,
     >,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FetchPartition {
     #[prost(string, tag = "1")]
@@ -326,7 +303,6 @@ pub struct FetchPartition {
     #[prost(uint32, tag = "6")]
     pub port: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionLocation {
     /// partition_id of the map stage who produces the shuffle.
@@ -343,7 +319,6 @@ pub struct PartitionLocation {
     pub path: ::prost::alloc::string::String,
 }
 /// Unique identifier for a materialized partition of data
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionId {
     #[prost(string, tag = "1")]
@@ -353,8 +328,7 @@ pub struct PartitionId {
     #[prost(uint32, tag = "4")]
     pub partition_id: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct TaskId {
     #[prost(uint32, tag = "1")]
     pub task_id: u32,
@@ -363,7 +337,6 @@ pub struct TaskId {
     #[prost(uint32, tag = "3")]
     pub partition_id: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionStats {
     #[prost(int64, tag = "1")]
@@ -375,7 +348,6 @@ pub struct PartitionStats {
     #[prost(message, repeated, tag = "4")]
     pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ColumnStats {
     #[prost(message, optional, tag = "1")]
@@ -387,13 +359,11 @@ pub struct ColumnStats {
     #[prost(uint32, tag = "4")]
     pub distinct_count: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct OperatorMetricsSet {
     #[prost(message, repeated, tag = "1")]
     pub metrics: ::prost::alloc::vec::Vec<OperatorMetric>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedCount {
     #[prost(string, tag = "1")]
@@ -401,7 +371,6 @@ pub struct NamedCount {
     #[prost(uint64, tag = "2")]
     pub value: u64,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedGauge {
     #[prost(string, tag = "1")]
@@ -409,7 +378,6 @@ pub struct NamedGauge {
     #[prost(uint64, tag = "2")]
     pub value: u64,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedTime {
     #[prost(string, tag = "1")]
@@ -417,7 +385,6 @@ pub struct NamedTime {
     #[prost(uint64, tag = "2")]
     pub value: u64,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct OperatorMetric {
     #[prost(
@@ -428,7 +395,6 @@ pub struct OperatorMetric {
 }
 /// Nested message and enum types in `OperatorMetric`.
 pub mod operator_metric {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Metric {
         #[prost(uint64, tag = "1")]
@@ -456,7 +422,6 @@ pub mod operator_metric {
     }
 }
 /// Used by scheduler
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorMetadata {
     #[prost(string, tag = "1")]
@@ -471,7 +436,6 @@ pub struct ExecutorMetadata {
     pub specification: ::core::option::Option<ExecutorSpecification>,
 }
 /// Used by grpc
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorRegistration {
     #[prost(string, tag = "1")]
@@ -491,14 +455,12 @@ pub struct ExecutorRegistration {
 pub mod executor_registration {
     /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 
(see <https://github.com/tokio-rs/prost/issues/430> and 
<https://github.com/tokio-rs/prost/pull/455>)
     /// this syntax is ugly but is binary compatible with the "optional" 
keyword (see 
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum OptionalHost {
         #[prost(string, tag = "2")]
         Host(::prost::alloc::string::String),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorHeartbeat {
     #[prost(string, tag = "1")]
@@ -511,8 +473,7 @@ pub struct ExecutorHeartbeat {
     #[prost(message, optional, tag = "4")]
     pub status: ::core::option::Option<ExecutorStatus>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ExecutorMetric {
     /// TODO add more metrics
     #[prost(oneof = "executor_metric::Metric", tags = "1")]
@@ -521,14 +482,12 @@ pub struct ExecutorMetric {
 /// Nested message and enum types in `ExecutorMetric`.
 pub mod executor_metric {
     /// TODO add more metrics
-    #[allow(clippy::derive_partial_eq_without_eq)]
-    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
     pub enum Metric {
         #[prost(uint64, tag = "1")]
         AvailableMemory(u64),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStatus {
     #[prost(oneof = "executor_status::Status", tags = "1, 2, 3, 4")]
@@ -536,7 +495,6 @@ pub struct ExecutorStatus {
 }
 /// Nested message and enum types in `ExecutorStatus`.
 pub mod executor_status {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(string, tag = "1")]
@@ -549,14 +507,12 @@ pub mod executor_status {
         Terminating(::prost::alloc::string::String),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorSpecification {
     #[prost(message, repeated, tag = "1")]
     pub resources: ::prost::alloc::vec::Vec<ExecutorResource>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ExecutorResource {
     /// TODO add more resources
     #[prost(oneof = "executor_resource::Resource", tags = "1")]
@@ -565,14 +521,12 @@ pub struct ExecutorResource {
 /// Nested message and enum types in `ExecutorResource`.
 pub mod executor_resource {
     /// TODO add more resources
-    #[allow(clippy::derive_partial_eq_without_eq)]
-    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
     pub enum Resource {
         #[prost(uint32, tag = "1")]
         TaskSlots(u32),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct AvailableTaskSlots {
     #[prost(string, tag = "1")]
@@ -580,13 +534,11 @@ pub struct AvailableTaskSlots {
     #[prost(uint32, tag = "2")]
     pub slots: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorTaskSlots {
     #[prost(message, repeated, tag = "1")]
     pub task_slots: ::prost::alloc::vec::Vec<AvailableTaskSlots>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorData {
     #[prost(string, tag = "1")]
@@ -594,21 +546,18 @@ pub struct ExecutorData {
     #[prost(message, repeated, tag = "2")]
     pub resources: ::prost::alloc::vec::Vec<ExecutorResourcePair>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ExecutorResourcePair {
     #[prost(message, optional, tag = "1")]
     pub total: ::core::option::Option<ExecutorResource>,
     #[prost(message, optional, tag = "2")]
     pub available: ::core::option::Option<ExecutorResource>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningTask {
     #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedTask {
     #[prost(string, tag = "1")]
@@ -623,7 +572,6 @@ pub struct FailedTask {
 }
 /// Nested message and enum types in `FailedTask`.
 pub mod failed_task {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum FailedReason {
         #[prost(message, tag = "4")]
@@ -641,7 +589,6 @@ pub mod failed_task {
         TaskKilled(super::TaskKilled),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulTask {
     #[prost(string, tag = "1")]
@@ -651,10 +598,8 @@ pub struct SuccessfulTask {
     #[prost(message, repeated, tag = "2")]
     pub partitions: ::prost::alloc::vec::Vec<ShuffleWritePartition>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ExecutionError {}
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FetchPartitionError {
     #[prost(string, tag = "1")]
@@ -664,19 +609,14 @@ pub struct FetchPartitionError {
     #[prost(uint32, tag = "3")]
     pub map_partition_id: u32,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct IoError {}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ExecutorLost {}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ResultLost {}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct TaskKilled {}
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleWritePartition {
     #[prost(uint64, tag = "1")]
@@ -690,7 +630,6 @@ pub struct ShuffleWritePartition {
     #[prost(uint64, tag = "5")]
     pub num_bytes: u64,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskStatus {
     #[prost(uint32, tag = "1")]
@@ -716,7 +655,6 @@ pub struct TaskStatus {
 }
 /// Nested message and enum types in `TaskStatus`.
 pub mod task_status {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(message, tag = "9")]
@@ -727,7 +665,6 @@ pub mod task_status {
         Successful(super::SuccessfulTask),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PollWorkParams {
     #[prost(message, optional, tag = "1")]
@@ -738,7 +675,6 @@ pub struct PollWorkParams {
     #[prost(message, repeated, tag = "3")]
     pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskDefinition {
     #[prost(uint32, tag = "1")]
@@ -763,7 +699,6 @@ pub struct TaskDefinition {
     pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
 /// A set of tasks in the same stage
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct MultiTaskDefinition {
     #[prost(message, repeated, tag = "1")]
@@ -783,13 +718,11 @@ pub struct MultiTaskDefinition {
     #[prost(message, repeated, tag = "9")]
     pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SessionSettings {
     #[prost(message, repeated, tag = "1")]
     pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JobSessionConfig {
     #[prost(string, tag = "1")]
@@ -797,25 +730,21 @@ pub struct JobSessionConfig {
     #[prost(message, repeated, tag = "2")]
     pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PollWorkResult {
     #[prost(message, repeated, tag = "1")]
     pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RegisterExecutorParams {
     #[prost(message, optional, tag = "1")]
     pub metadata: ::core::option::Option<ExecutorRegistration>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct RegisterExecutorResult {
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct HeartBeatParams {
     #[prost(string, tag = "1")]
@@ -827,14 +756,12 @@ pub struct HeartBeatParams {
     #[prost(message, optional, tag = "4")]
     pub metadata: ::core::option::Option<ExecutorRegistration>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct HeartBeatResult {
     /// TODO it's from Spark for BlockManager
     #[prost(bool, tag = "1")]
     pub reregister: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct StopExecutorParams {
     #[prost(string, tag = "1")]
@@ -846,10 +773,8 @@ pub struct StopExecutorParams {
     #[prost(bool, tag = "3")]
     pub force: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct StopExecutorResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStoppedParams {
     #[prost(string, tag = "1")]
@@ -858,10 +783,8 @@ pub struct ExecutorStoppedParams {
     #[prost(string, tag = "2")]
     pub reason: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct ExecutorStoppedResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UpdateTaskStatusParams {
     #[prost(string, tag = "1")]
@@ -870,13 +793,11 @@ pub struct UpdateTaskStatusParams {
     #[prost(message, repeated, tag = "2")]
     pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct UpdateTaskStatusResult {
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryParams {
     #[prost(message, repeated, tag = "4")]
@@ -890,7 +811,6 @@ pub struct ExecuteQueryParams {
 }
 /// Nested message and enum types in `ExecuteQueryParams`.
 pub mod execute_query_params {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Query {
         #[prost(bytes, tag = "1")]
@@ -898,26 +818,22 @@ pub mod execute_query_params {
         #[prost(string, tag = "2")]
         Sql(::prost::alloc::string::String),
     }
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum OptionalSessionId {
         #[prost(string, tag = "3")]
         SessionId(::prost::alloc::string::String),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CreateSessionParams {
     #[prost(message, repeated, tag = "1")]
     pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CreateSessionResult {
     #[prost(string, tag = "1")]
     pub session_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UpdateSessionParams {
     #[prost(string, tag = "1")]
@@ -925,31 +841,26 @@ pub struct UpdateSessionParams {
     #[prost(message, repeated, tag = "2")]
     pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct UpdateSessionResult {
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RemoveSessionParams {
     #[prost(string, tag = "1")]
     pub session_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct RemoveSessionResult {
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteSqlParams {
     #[prost(string, tag = "1")]
     pub sql: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryResult {
     #[prost(oneof = "execute_query_result::Result", tags = "1, 2")]
@@ -957,7 +868,6 @@ pub struct ExecuteQueryResult {
 }
 /// Nested message and enum types in `ExecuteQueryResult`.
 pub mod execute_query_result {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Result {
         #[prost(message, tag = "1")]
@@ -966,7 +876,6 @@ pub mod execute_query_result {
         Failure(super::ExecuteQueryFailureResult),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQuerySuccessResult {
     #[prost(string, tag = "1")]
@@ -974,7 +883,6 @@ pub struct ExecuteQuerySuccessResult {
     #[prost(string, tag = "2")]
     pub session_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryFailureResult {
     #[prost(oneof = "execute_query_failure_result::Failure", tags = "1, 2, 3")]
@@ -982,7 +890,6 @@ pub struct ExecuteQueryFailureResult {
 }
 /// Nested message and enum types in `ExecuteQueryFailureResult`.
 pub mod execute_query_failure_result {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Failure {
         #[prost(string, tag = "1")]
@@ -993,13 +900,11 @@ pub mod execute_query_failure_result {
         SqlParsingFailure(::prost::alloc::string::String),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetJobStatusParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulJob {
     #[prost(message, repeated, tag = "1")]
@@ -1011,14 +916,12 @@ pub struct SuccessfulJob {
     #[prost(uint64, tag = "4")]
     pub ended_at: u64,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct QueuedJob {
     #[prost(uint64, tag = "1")]
     pub queued_at: u64,
 }
 /// TODO: add progress report
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningJob {
     #[prost(uint64, tag = "1")]
@@ -1028,7 +931,6 @@ pub struct RunningJob {
     #[prost(string, tag = "3")]
     pub scheduler: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedJob {
     #[prost(string, tag = "1")]
@@ -1040,7 +942,6 @@ pub struct FailedJob {
     #[prost(uint64, tag = "4")]
     pub ended_at: u64,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JobStatus {
     #[prost(string, tag = "5")]
@@ -1052,7 +953,6 @@ pub struct JobStatus {
 }
 /// Nested message and enum types in `JobStatus`.
 pub mod job_status {
-    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(message, tag = "1")]
@@ -1065,13 +965,11 @@ pub mod job_status {
         Successful(super::SuccessfulJob),
     }
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetJobStatusResult {
     #[prost(message, optional, tag = "1")]
     pub status: ::core::option::Option<JobStatus>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetFileMetadataParams {
     #[prost(string, tag = "1")]
@@ -1079,40 +977,33 @@ pub struct GetFileMetadataParams {
     #[prost(string, tag = "2")]
     pub file_type: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetFileMetadataResult {
     #[prost(message, optional, tag = "1")]
     pub schema: ::core::option::Option<::datafusion_proto_common::Schema>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FilePartitionMetadata {
     #[prost(string, repeated, tag = "1")]
     pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelJobParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct CancelJobResult {
     #[prost(bool, tag = "1")]
     pub cancelled: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CleanJobDataParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct CleanJobDataResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchTaskParams {
     /// Allow to launch a task set to an executor at once
@@ -1121,7 +1012,6 @@ pub struct LaunchTaskParams {
     #[prost(string, tag = "2")]
     pub scheduler_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchMultiTaskParams {
     /// Allow to launch a task set to an executor at once
@@ -1130,42 +1020,35 @@ pub struct LaunchMultiTaskParams {
     #[prost(string, tag = "2")]
     pub scheduler_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct LaunchTaskResult {
     /// TODO when part of the task set are scheduled successfully
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct LaunchMultiTaskResult {
     /// TODO when part of the task set are scheduled successfully
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelTasksParams {
     #[prost(message, repeated, tag = "1")]
     pub task_infos: ::prost::alloc::vec::Vec<RunningTaskInfo>,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct CancelTasksResult {
     #[prost(bool, tag = "1")]
     pub cancelled: bool,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RemoveJobDataParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
 pub struct RemoveJobDataResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningTaskInfo {
     #[prost(uint32, tag = "1")]
@@ -1179,7 +1062,13 @@ pub struct RunningTaskInfo {
 }
 /// Generated client implementations.
 pub mod scheduler_grpc_client {
-    #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
+    #![allow(
+        unused_variables,
+        dead_code,
+        missing_docs,
+        clippy::wildcard_imports,
+        clippy::let_unit_value,
+    )]
     use tonic::codegen::*;
     use tonic::codegen::http::Uri;
     #[derive(Debug, Clone)]
@@ -1201,8 +1090,8 @@ pub mod scheduler_grpc_client {
     where
         T: tonic::client::GrpcService<tonic::body::BoxBody>,
         T::Error: Into<StdError>,
-        T::ResponseBody: Body<Data = Bytes> + Send + 'static,
-        <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
+        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
     {
         pub fn new(inner: T) -> Self {
             let inner = tonic::client::Grpc::new(inner);
@@ -1227,7 +1116,7 @@ pub mod scheduler_grpc_client {
             >,
             <T as tonic::codegen::Service<
                 http::Request<tonic::body::BoxBody>,
-            >>::Error: Into<StdError> + Send + Sync,
+            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
         {
             SchedulerGrpcClient::new(InterceptedService::new(inner, 
interceptor))
         }
@@ -1271,8 +1160,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1296,8 +1184,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1328,8 +1215,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1358,8 +1244,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1388,8 +1273,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1415,8 +1299,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1442,8 +1325,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1469,8 +1351,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1496,8 +1377,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1523,8 +1403,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1551,8 +1430,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1578,8 +1456,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1603,8 +1480,7 @@ pub mod scheduler_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1623,7 +1499,13 @@ pub mod scheduler_grpc_client {
 }
 /// Generated client implementations.
 pub mod executor_grpc_client {
-    #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
+    #![allow(
+        unused_variables,
+        dead_code,
+        missing_docs,
+        clippy::wildcard_imports,
+        clippy::let_unit_value,
+    )]
     use tonic::codegen::*;
     use tonic::codegen::http::Uri;
     #[derive(Debug, Clone)]
@@ -1645,8 +1527,8 @@ pub mod executor_grpc_client {
     where
         T: tonic::client::GrpcService<tonic::body::BoxBody>,
         T::Error: Into<StdError>,
-        T::ResponseBody: Body<Data = Bytes> + Send + 'static,
-        <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
+        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
     {
         pub fn new(inner: T) -> Self {
             let inner = tonic::client::Grpc::new(inner);
@@ -1671,7 +1553,7 @@ pub mod executor_grpc_client {
             >,
             <T as tonic::codegen::Service<
                 http::Request<tonic::body::BoxBody>,
-            >>::Error: Into<StdError> + Send + Sync,
+            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
         {
             ExecutorGrpcClient::new(InterceptedService::new(inner, 
interceptor))
         }
@@ -1717,8 +1599,7 @@ pub mod executor_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1742,8 +1623,7 @@ pub mod executor_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1769,8 +1649,7 @@ pub mod executor_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1796,8 +1675,7 @@ pub mod executor_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1823,8 +1701,7 @@ pub mod executor_grpc_client {
                 .ready()
                 .await
                 .map_err(|e| {
-                    tonic::Status::new(
-                        tonic::Code::Unknown,
+                    tonic::Status::unknown(
                         format!("Service was not ready: {}", e.into()),
                     )
                 })?;
@@ -1843,11 +1720,17 @@ pub mod executor_grpc_client {
 }
 /// Generated server implementations.
 pub mod scheduler_grpc_server {
-    #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
+    #![allow(
+        unused_variables,
+        dead_code,
+        missing_docs,
+        clippy::wildcard_imports,
+        clippy::let_unit_value,
+    )]
     use tonic::codegen::*;
     /// Generated trait containing gRPC methods that should be implemented for 
use with SchedulerGrpcServer.
     #[async_trait]
-    pub trait SchedulerGrpc: Send + Sync + 'static {
+    pub trait SchedulerGrpc: std::marker::Send + std::marker::Sync + 'static {
         /// Executors must poll the scheduler for heartbeat and to receive 
tasks
         async fn poll_work(
             &self,
@@ -1936,20 +1819,18 @@ pub mod scheduler_grpc_server {
         >;
     }
     #[derive(Debug)]
-    pub struct SchedulerGrpcServer<T: SchedulerGrpc> {
-        inner: _Inner<T>,
+    pub struct SchedulerGrpcServer<T> {
+        inner: Arc<T>,
         accept_compression_encodings: EnabledCompressionEncodings,
         send_compression_encodings: EnabledCompressionEncodings,
         max_decoding_message_size: Option<usize>,
         max_encoding_message_size: Option<usize>,
     }
-    struct _Inner<T>(Arc<T>);
-    impl<T: SchedulerGrpc> SchedulerGrpcServer<T> {
+    impl<T> SchedulerGrpcServer<T> {
         pub fn new(inner: T) -> Self {
             Self::from_arc(Arc::new(inner))
         }
         pub fn from_arc(inner: Arc<T>) -> Self {
-            let inner = _Inner(inner);
             Self {
                 inner,
                 accept_compression_encodings: Default::default(),
@@ -1999,8 +1880,8 @@ pub mod scheduler_grpc_server {
     impl<T, B> tonic::codegen::Service<http::Request<B>> for 
SchedulerGrpcServer<T>
     where
         T: SchedulerGrpc,
-        B: Body + Send + 'static,
-        B::Error: Into<StdError> + Send + 'static,
+        B: Body + std::marker::Send + 'static,
+        B::Error: Into<StdError> + std::marker::Send + 'static,
     {
         type Response = http::Response<tonic::body::BoxBody>;
         type Error = std::convert::Infallible;
@@ -2012,7 +1893,6 @@ pub mod scheduler_grpc_server {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
-            let inner = self.inner.clone();
             match req.uri().path() {
                 "/ballista.protobuf.SchedulerGrpc/PollWork" => {
                     #[allow(non_camel_case_types)]
@@ -2043,7 +1923,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = PollWorkSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2090,7 +1969,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = RegisterExecutorSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2140,7 +2018,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = HeartBeatFromExecutorSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2187,7 +2064,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = UpdateTaskStatusSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2234,7 +2110,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = GetFileMetadataSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2280,7 +2155,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = CreateSessionSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2326,7 +2200,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = UpdateSessionSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2372,7 +2245,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = RemoveSessionSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2418,7 +2290,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = ExecuteQuerySvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2464,7 +2335,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = GetJobStatusSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2511,7 +2381,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = ExecutorStoppedSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2557,7 +2426,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = CancelJobSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2603,7 +2471,6 @@ pub mod scheduler_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = CleanJobDataSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2622,20 +2489,25 @@ pub mod scheduler_grpc_server {
                 }
                 _ => {
                     Box::pin(async move {
-                        Ok(
-                            http::Response::builder()
-                                .status(200)
-                                .header("grpc-status", "12")
-                                .header("content-type", "application/grpc")
-                                .body(empty_body())
-                                .unwrap(),
-                        )
+                        let mut response = http::Response::new(empty_body());
+                        let headers = response.headers_mut();
+                        headers
+                            .insert(
+                                tonic::Status::GRPC_STATUS,
+                                (tonic::Code::Unimplemented as i32).into(),
+                            );
+                        headers
+                            .insert(
+                                http::header::CONTENT_TYPE,
+                                tonic::metadata::GRPC_CONTENT_TYPE,
+                            );
+                        Ok(response)
                     })
                 }
             }
         }
     }
-    impl<T: SchedulerGrpc> Clone for SchedulerGrpcServer<T> {
+    impl<T> Clone for SchedulerGrpcServer<T> {
         fn clone(&self) -> Self {
             let inner = self.inner.clone();
             Self {
@@ -2647,27 +2519,25 @@ pub mod scheduler_grpc_server {
             }
         }
     }
-    impl<T: SchedulerGrpc> Clone for _Inner<T> {
-        fn clone(&self) -> Self {
-            Self(Arc::clone(&self.0))
-        }
-    }
-    impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
-        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-            write!(f, "{:?}", self.0)
-        }
-    }
-    impl<T: SchedulerGrpc> tonic::server::NamedService for 
SchedulerGrpcServer<T> {
-        const NAME: &'static str = "ballista.protobuf.SchedulerGrpc";
+    /// Generated gRPC service name
+    pub const SERVICE_NAME: &str = "ballista.protobuf.SchedulerGrpc";
+    impl<T> tonic::server::NamedService for SchedulerGrpcServer<T> {
+        const NAME: &'static str = SERVICE_NAME;
     }
 }
 /// Generated server implementations.
 pub mod executor_grpc_server {
-    #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
+    #![allow(
+        unused_variables,
+        dead_code,
+        missing_docs,
+        clippy::wildcard_imports,
+        clippy::let_unit_value,
+    )]
     use tonic::codegen::*;
     /// Generated trait containing gRPC methods that should be implemented for 
use with ExecutorGrpcServer.
     #[async_trait]
-    pub trait ExecutorGrpc: Send + Sync + 'static {
+    pub trait ExecutorGrpc: std::marker::Send + std::marker::Sync + 'static {
         async fn launch_task(
             &self,
             request: tonic::Request<super::LaunchTaskParams>,
@@ -2705,20 +2575,18 @@ pub mod executor_grpc_server {
         >;
     }
     #[derive(Debug)]
-    pub struct ExecutorGrpcServer<T: ExecutorGrpc> {
-        inner: _Inner<T>,
+    pub struct ExecutorGrpcServer<T> {
+        inner: Arc<T>,
         accept_compression_encodings: EnabledCompressionEncodings,
         send_compression_encodings: EnabledCompressionEncodings,
         max_decoding_message_size: Option<usize>,
         max_encoding_message_size: Option<usize>,
     }
-    struct _Inner<T>(Arc<T>);
-    impl<T: ExecutorGrpc> ExecutorGrpcServer<T> {
+    impl<T> ExecutorGrpcServer<T> {
         pub fn new(inner: T) -> Self {
             Self::from_arc(Arc::new(inner))
         }
         pub fn from_arc(inner: Arc<T>) -> Self {
-            let inner = _Inner(inner);
             Self {
                 inner,
                 accept_compression_encodings: Default::default(),
@@ -2768,8 +2636,8 @@ pub mod executor_grpc_server {
     impl<T, B> tonic::codegen::Service<http::Request<B>> for 
ExecutorGrpcServer<T>
     where
         T: ExecutorGrpc,
-        B: Body + Send + 'static,
-        B::Error: Into<StdError> + Send + 'static,
+        B: Body + std::marker::Send + 'static,
+        B::Error: Into<StdError> + std::marker::Send + 'static,
     {
         type Response = http::Response<tonic::body::BoxBody>;
         type Error = std::convert::Infallible;
@@ -2781,7 +2649,6 @@ pub mod executor_grpc_server {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
-            let inner = self.inner.clone();
             match req.uri().path() {
                 "/ballista.protobuf.ExecutorGrpc/LaunchTask" => {
                     #[allow(non_camel_case_types)]
@@ -2812,7 +2679,6 @@ pub mod executor_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = LaunchTaskSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2859,7 +2725,6 @@ pub mod executor_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = LaunchMultiTaskSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2905,7 +2770,6 @@ pub mod executor_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = StopExecutorSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2951,7 +2815,6 @@ pub mod executor_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = CancelTasksSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -2997,7 +2860,6 @@ pub mod executor_grpc_server {
                     let max_encoding_message_size = 
self.max_encoding_message_size;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let inner = inner.0;
                         let method = RemoveJobDataSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
                         let mut grpc = tonic::server::Grpc::new(codec)
@@ -3016,20 +2878,25 @@ pub mod executor_grpc_server {
                 }
                 _ => {
                     Box::pin(async move {
-                        Ok(
-                            http::Response::builder()
-                                .status(200)
-                                .header("grpc-status", "12")
-                                .header("content-type", "application/grpc")
-                                .body(empty_body())
-                                .unwrap(),
-                        )
+                        let mut response = http::Response::new(empty_body());
+                        let headers = response.headers_mut();
+                        headers
+                            .insert(
+                                tonic::Status::GRPC_STATUS,
+                                (tonic::Code::Unimplemented as i32).into(),
+                            );
+                        headers
+                            .insert(
+                                http::header::CONTENT_TYPE,
+                                tonic::metadata::GRPC_CONTENT_TYPE,
+                            );
+                        Ok(response)
                     })
                 }
             }
         }
     }
-    impl<T: ExecutorGrpc> Clone for ExecutorGrpcServer<T> {
+    impl<T> Clone for ExecutorGrpcServer<T> {
         fn clone(&self) -> Self {
             let inner = self.inner.clone();
             Self {
@@ -3041,17 +2908,9 @@ pub mod executor_grpc_server {
             }
         }
     }
-    impl<T: ExecutorGrpc> Clone for _Inner<T> {
-        fn clone(&self) -> Self {
-            Self(Arc::clone(&self.0))
-        }
-    }
-    impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
-        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-            write!(f, "{:?}", self.0)
-        }
-    }
-    impl<T: ExecutorGrpc> tonic::server::NamedService for 
ExecutorGrpcServer<T> {
-        const NAME: &'static str = "ballista.protobuf.ExecutorGrpc";
+    /// Generated gRPC service name
+    pub const SERVICE_NAME: &str = "ballista.protobuf.ExecutorGrpc";
+    impl<T> tonic::server::NamedService for ExecutorGrpcServer<T> {
+        const NAME: &'static str = SERVICE_NAME;
     }
 }
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 2bb555d1..fce4a399 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -309,7 +309,7 @@ impl PhysicalExtensionCodec for 
BallistaPhysicalExtensionCodec {
                     Some(datafusion_proto::protobuf::PhysicalHashRepartition {
                         hash_expr: exprs
                             .iter()
-                            
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
 &default_codec))
+                            
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(&expr.clone(),
 &default_codec))
                             .collect::<Result<Vec<_>, DataFusionError>>()?,
                         partition_count: *partition_count as u64,
                     })
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs 
b/ballista/core/src/serde/scheduler/to_proto.rs
index f6a878fa..29b00dd7 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -106,7 +106,7 @@ pub fn hash_partitioning_to_proto(
             Ok(Some(datafusion_protobuf::PhysicalHashRepartition {
                 hash_expr: exprs
                     .iter()
-                    
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
 &default_codec))
+                    
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(&expr.clone(),
 &default_codec))
                     .collect::<Result<Vec<_>, DataFusionError>>()?,
                 partition_count: *partition_count as u64,
             }))
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index 43387f09..a96a752c 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -38,7 +38,7 @@ use arrow_flight::{
 use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
 use futures::{Stream, StreamExt, TryStreamExt};
 use log::{debug, info};
-use std::io::{Read, Seek};
+use std::io::{BufReader, Read, Seek};
 use tokio::sync::mpsc::channel;
 use tokio::sync::mpsc::error::SendError;
 use tokio::{sync::mpsc::Sender, task};
@@ -95,6 +95,7 @@ impl FlightService for BallistaFlightService {
                         ))
                     })
                     .map_err(|e| from_ballista_err(&e))?;
+                let file = BufReader::new(file);
                 let reader =
                     StreamReader::try_new(file, None).map_err(|e| 
from_arrow_err(&e))?;
 
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index e6c4b3ad..1367f17b 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -43,7 +43,7 @@ prometheus-metrics = ["prometheus", "once_cell"]
 anyhow = "1"
 arrow-flight = { workspace = true }
 async-trait = { workspace = true }
-axum = "0.6.20"
+axum = "0.7.7"
 ballista-core = { path = "../core", version = "0.12.0", features = ["s3"] }
 base64 = { version = "0.22" }
 clap = { workspace = true }
@@ -53,7 +53,7 @@ datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
 futures = { workspace = true }
 graphviz-rust = "0.9.0"
-http = "0.2.9"
+http = "1.1"
 log = { workspace = true }
 object_store = { workspace = true }
 once_cell = { version = "1.16.0", optional = true }
diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs
index d0c8c270..5a3e00cc 100644
--- a/ballista/scheduler/build.rs
+++ b/ballista/scheduler/build.rs
@@ -24,6 +24,6 @@ fn main() -> Result<(), String> {
 
     println!("cargo:rerun-if-changed=proto/keda.proto");
     tonic_build::configure()
-        .compile(&["proto/keda.proto"], &["proto"])
+        .compile_protos(&["proto/keda.proto"], &["proto"])
         .map_err(|e| format!("protobuf compilation failed: {e}"))
 }
diff --git a/ballista/scheduler/src/flight_sql.rs 
b/ballista/scheduler/src/flight_sql.rs
index c6297d78..2187db06 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -64,7 +64,9 @@ use datafusion::arrow;
 use datafusion::arrow::array::{ArrayRef, StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::arrow::error::ArrowError;
-use datafusion::arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions};
+use datafusion::arrow::ipc::writer::{
+    DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
+};
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::common::DFSchemaRef;
 use datafusion::logical_expr::LogicalPlan;
@@ -368,7 +370,15 @@ impl FlightSqlServiceImpl {
         let options = IpcWriteOptions::default();
         let pair = SchemaAsIpc::new(&arrow_schema, &options);
         let data_gen = IpcDataGenerator::default();
-        let encoded_data = data_gen.schema_to_bytes(pair.0, pair.1);
+        let mut dictionary_tracker = 
DictionaryTracker::new_with_preserve_dict_id(
+            false,
+            pair.1.preserve_dict_id(),
+        );
+        let encoded_data = data_gen.schema_to_bytes_with_dictionary_tracker(
+            pair.0,
+            &mut dictionary_tracker,
+            pair.1,
+        );
         let mut schema_bytes = vec![];
         arrow::ipc::writer::write_message(&mut schema_bytes, encoded_data, 
pair.1)
             .map_err(|e| Status::internal(format!("Error encoding schema: 
{e}")))?;
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index 140dc141..5d1671a0 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -80,15 +80,15 @@ pub async fn start_server(
         FlightSqlServiceImpl::new(scheduler_server.clone()),
     ));
 
-    let tonic = tonic_builder.into_service().into_router();
+    let tonic = tonic_builder.into_service().into_axum_router();
 
     let axum = get_routes(Arc::new(scheduler_server));
     let merged = axum
         .merge(tonic)
         .into_make_service_with_connect_info::<SocketAddr>();
 
-    axum::Server::bind(&addr)
-        .serve(merged)
+    let listener = tokio::net::TcpListener::bind(&addr)
         .await
-        .map_err(Error::from)
+        .map_err(Error::from)?;
+    axum::serve(listener, merged).await.map_err(Error::from)
 }
diff --git a/ballista/scheduler/src/state/session_manager.rs 
b/ballista/scheduler/src/state/session_manager.rs
index 4cf9d83f..8880dfd1 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -66,7 +66,7 @@ pub fn create_datafusion_context(
     session_builder: SessionBuilder,
 ) -> Arc<SessionContext> {
     let config =
-        
SessionConfig::from_string_hash_map(ballista_config.settings().clone()).unwrap();
+        
SessionConfig::from_string_hash_map(&ballista_config.settings().clone()).unwrap();
     let config = config
         .with_target_partitions(ballista_config.default_shuffle_partitions())
         .with_batch_size(ballista_config.default_batch_size())
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index 5e5dee12..27bc0ec8 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -950,7 +950,7 @@ pub async fn test_join_plan(partition: usize) -> 
ExecutionGraph {
         .build()
         .unwrap();
 
-    let sort_expr = Expr::Sort(SortExpr::new(Box::new(col("id")), false, 
false));
+    let sort_expr = SortExpr::new(col("id"), false, false);
 
     let logical_plan = left_plan
         .join(right_plan, JoinType::Inner, (vec!["id"], vec!["id"]), None)
diff --git a/python/Cargo.toml b/python/Cargo.toml
index eb662cb1..758d162f 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -33,11 +33,11 @@ publish = false
 async-trait = "0.1.77"
 ballista = { path = "../ballista/client", version = "0.12.0" }
 ballista-core = { path = "../ballista/core", version = "0.12.0" }
-datafusion = "41.0.0"
-datafusion-proto = "41.0.0"
-datafusion-python = "41.0.0"
+datafusion = { workspace = true }
+datafusion-proto = { workspace = true }
+datafusion-python = { workspace = true }
 
-pyo3 = { version = "0.21", features = ["extension-module", "abi3", 
"abi3-py38"] }
+pyo3 = { version = "0.22", features = ["extension-module", "abi3", 
"abi3-py38"] }
 pyo3-log = "0.11.0"
 tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to