This is an automated email from the ASF dual-hosted git repository.
mete pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 1c4a38c5 update to datafusion 42 ... (#1080)
1c4a38c5 is described below
commit 1c4a38c5383f75f7f3ab41ff7e6ce0186a9bd87a
Author: Marko Milenković <[email protected]>
AuthorDate: Mon Oct 14 22:55:25 2024 +0100
update to datafusion 42 ... (#1080)
... and all other dependant libraries
---
Cargo.toml | 26 +-
ballista-cli/Cargo.toml | 5 +-
ballista-cli/src/command.rs | 4 +-
ballista-cli/src/main.rs | 4 +-
ballista/core/build.rs | 2 +-
ballista/core/proto/datafusion.proto | 176 +++++-----
ballista/core/proto/datafusion_common.proto | 41 ++-
.../core/src/execution_plans/shuffle_reader.rs | 1 +
ballista/core/src/serde/generated/ballista.rs | 391 +++++++--------------
ballista/core/src/serde/mod.rs | 2 +-
ballista/core/src/serde/scheduler/to_proto.rs | 2 +-
ballista/executor/src/flight_service.rs | 3 +-
ballista/scheduler/Cargo.toml | 4 +-
ballista/scheduler/build.rs | 2 +-
ballista/scheduler/src/flight_sql.rs | 14 +-
ballista/scheduler/src/scheduler_process.rs | 8 +-
ballista/scheduler/src/state/session_manager.rs | 2 +-
ballista/scheduler/src/test_utils.rs | 2 +-
python/Cargo.toml | 8 +-
19 files changed, 306 insertions(+), 391 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index e77e5468..007482bf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,23 +21,23 @@ members = ["ballista-cli", "ballista/client",
"ballista/core", "ballista/executo
resolver = "2"
[workspace.dependencies]
-arrow = { version = "52.2.0", features = ["ipc_compression"] }
-arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "52.2.0", default-features = false }
+arrow = { version = "53", features = ["ipc_compression"] }
+arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "53", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
# bump directly to datafusion v43 to avoid the serde bug on v42
(https://github.com/apache/datafusion/pull/12626)
-datafusion = "41.0.0"
-datafusion-cli = "41.0.0"
-datafusion-proto = "41.0.0"
-datafusion-proto-common = "41.0.0"
-object_store = "0.10.2"
-prost = "0.12.0"
-prost-types = "0.12.0"
-sqlparser = "0.49.0"
-tonic = { version = "0.11.0" }
-tonic-build = { version = "0.11.0", default-features = false, features = [
+datafusion = "42.0.0"
+datafusion-cli = "42.0.0"
+datafusion-proto = "42.0.0"
+datafusion-proto-common = "42.0.0"
+object_store = "0.11"
+prost = "0.13"
+prost-types = "0.13"
+sqlparser = "0.50"
+tonic = { version = "0.12" }
+tonic-build = { version = "0.12", default-features = false, features = [
"transport",
"prost"
] }
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 16ff0b54..891f5a7c 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -30,14 +30,15 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/client", version = "0.12.0", features =
["standalone"] }
-clap = { workspace = true }
+# datafusion-cli uses 4.5 clap, thus it does not depend on workspace
+clap = { version = "4.5", features = ["derive", "cargo"] }
datafusion = { workspace = true }
datafusion-cli = { workspace = true }
dirs = "5.0.1"
env_logger = { workspace = true }
mimalloc = { version = "0.1", default-features = false }
num_cpus = { workspace = true }
-rustyline = "11.0.0"
+rustyline = "14.0.0"
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread",
"sync", "parking_lot"] }
[features]
diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs
index 0c407cf0..2123713a 100644
--- a/ballista-cli/src/command.rs
+++ b/ballista-cli/src/command.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use std::time::Instant;
use ballista::prelude::{BallistaContext, BallistaError, Result};
-use clap::ArgEnum;
+
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
@@ -223,7 +223,7 @@ impl OutputFormat {
Err(BallistaError::General(format!(
"{:?} is not a valid format type [possible values:
{:?}]",
format,
- PrintFormat::value_variants()
+ "TO BE FIXED", //PrintFormat::value_variants()
)))
}
}
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 6aeecd6c..0fd6ddfd 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -58,7 +58,7 @@ struct Args {
#[clap(
short,
long,
- multiple_values = true,
+ num_args = 0..,
help = "Execute commands from file(s), then exit",
value_parser(parse_valid_file)
)]
@@ -67,7 +67,7 @@ struct Args {
#[clap(
short = 'r',
long,
- multiple_values = true,
+ num_args = 0..,
help = "Run the provided files on startup instead of ~/.ballistarc",
value_parser(parse_valid_file),
conflicts_with = "file"
diff --git a/ballista/core/build.rs b/ballista/core/build.rs
index 6e501b88..4fe7e3bf 100644
--- a/ballista/core/build.rs
+++ b/ballista/core/build.rs
@@ -44,7 +44,7 @@ fn main() -> Result<(), String> {
.extern_path(".datafusion_common", "::datafusion_proto_common")
.extern_path(".datafusion", "::datafusion_proto::protobuf")
.protoc_arg("--experimental_allow_proto3_optional")
- .compile(&["proto/ballista.proto"], &["proto"])
+ .compile_protos(&["proto/ballista.proto"], &["proto"])
.map_err(|e| format!("protobuf compilation failed: {e}"))?;
let generated_source_path = out.join("ballista.protobuf.rs");
let code = std::fs::read_to_string(generated_source_path).unwrap();
diff --git a/ballista/core/proto/datafusion.proto
b/ballista/core/proto/datafusion.proto
index 8402b92f..cf166ba9 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -75,6 +75,10 @@ message LogicalExprNodeCollection {
repeated LogicalExprNode logical_expr_nodes = 1;
}
+message SortExprNodeCollection {
+ repeated SortExprNode sort_expr_nodes = 1;
+}
+
message ListingTableScanNode {
reserved 1; // was string table_name
TableReference table_name = 14;
@@ -90,8 +94,9 @@ message ListingTableScanNode {
datafusion_common.CsvFormat csv = 10;
datafusion_common.ParquetFormat parquet = 11;
datafusion_common.AvroFormat avro = 12;
+ datafusion_common.NdJsonFormat json = 15;
}
- repeated LogicalExprNodeCollection file_sort_order = 13;
+ repeated SortExprNodeCollection file_sort_order = 13;
}
message ViewTableScanNode {
@@ -128,7 +133,7 @@ message SelectionNode {
message SortNode {
LogicalPlanNode input = 1;
- repeated LogicalExprNode expr = 2;
+ repeated SortExprNode expr = 2;
// Maximum number of highest/lowest rows to fetch; negative means no limit
int64 fetch = 3;
}
@@ -159,12 +164,12 @@ message CreateExternalTableNode {
repeated string table_partition_cols = 5;
bool if_not_exists = 6;
string definition = 7;
- repeated LogicalExprNodeCollection order_exprs = 10;
+ repeated SortExprNodeCollection order_exprs = 10;
bool unbounded = 11;
map<string, string> options = 8;
datafusion_common.Constraints constraints = 12;
map<string, LogicalExprNode> column_defaults = 13;
- }
+}
message PrepareNode {
string name = 1;
@@ -244,35 +249,51 @@ message DistinctNode {
message DistinctOnNode {
repeated LogicalExprNode on_expr = 1;
repeated LogicalExprNode select_expr = 2;
- repeated LogicalExprNode sort_expr = 3;
+ repeated SortExprNode sort_expr = 3;
LogicalPlanNode input = 4;
}
message CopyToNode {
- LogicalPlanNode input = 1;
- string output_url = 2;
- oneof format_options {
- datafusion_common.CsvOptions csv = 8;
- datafusion_common.JsonOptions json = 9;
- datafusion_common.TableParquetOptions parquet = 10;
- datafusion_common.AvroOptions avro = 11;
- datafusion_common.ArrowOptions arrow = 12;
- }
- repeated string partition_by = 7;
+ LogicalPlanNode input = 1;
+ string output_url = 2;
+ bytes file_type = 3;
+ repeated string partition_by = 7;
}
message UnnestNode {
- LogicalPlanNode input = 1;
- repeated datafusion_common.Column exec_columns = 2;
- repeated uint64 list_type_columns = 3;
- repeated uint64 struct_type_columns = 4;
- repeated uint64 dependency_indices = 5;
- datafusion_common.DfSchema schema = 6;
- UnnestOptions options = 7;
+ LogicalPlanNode input = 1;
+ repeated ColumnUnnestExec exec_columns = 2;
+ repeated ColumnUnnestListItem list_type_columns = 3;
+ repeated uint64 struct_type_columns = 4;
+ repeated uint64 dependency_indices = 5;
+ datafusion_common.DfSchema schema = 6;
+ UnnestOptions options = 7;
+}
+message ColumnUnnestListItem {
+ uint32 input_index = 1;
+ ColumnUnnestListRecursion recursion = 2;
+}
+
+message ColumnUnnestListRecursions {
+ repeated ColumnUnnestListRecursion recursions = 2;
+}
+
+message ColumnUnnestListRecursion {
+ datafusion_common.Column output_column = 1;
+ uint32 depth = 2;
+}
+
+message ColumnUnnestExec {
+ datafusion_common.Column column = 1;
+ oneof UnnestType {
+ ColumnUnnestListRecursions list = 2;
+ datafusion_common.EmptyMessage struct = 3;
+ datafusion_common.EmptyMessage inferred = 4;
+ }
}
message UnnestOptions {
- bool preserve_nulls = 1;
+ bool preserve_nulls = 1;
}
message UnionNode {
@@ -316,8 +337,6 @@ message LogicalExprNode {
// binary expressions
BinaryExprNode binary_expr = 4;
- // aggregate expressions
- AggregateExprNode aggregate_expr = 5;
// null checks
IsNull is_null_expr = 6;
@@ -327,7 +346,6 @@ message LogicalExprNode {
BetweenNode between = 9;
CaseNode case_ = 10;
CastNode cast = 11;
- SortExprNode sort = 12;
NegativeNode negative = 13;
InListNode in_list = 14;
Wildcard wildcard = 15;
@@ -369,7 +387,7 @@ message LogicalExprNode {
}
message Wildcard {
- string qualifier = 1;
+ TableReference qualifier = 1;
}
message PlaceholderNode {
@@ -471,57 +489,14 @@ message InListNode {
bool negated = 3;
}
-enum AggregateFunction {
- MIN = 0;
- MAX = 1;
- SUM = 2;
- AVG = 3;
- COUNT = 4;
- APPROX_DISTINCT = 5;
- ARRAY_AGG = 6;
- // VARIANCE = 7;
- VARIANCE_POP = 8;
- // COVARIANCE = 9;
- // COVARIANCE_POP = 10;
- STDDEV = 11;
- STDDEV_POP = 12;
- CORRELATION = 13;
- APPROX_PERCENTILE_CONT = 14;
- APPROX_MEDIAN = 15;
- APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
- GROUPING = 17;
- // MEDIAN = 18;
- BIT_AND = 19;
- BIT_OR = 20;
- BIT_XOR = 21;
- BOOL_AND = 22;
- BOOL_OR = 23;
- REGR_SLOPE = 26;
- REGR_INTERCEPT = 27;
- REGR_COUNT = 28;
- REGR_R2 = 29;
- REGR_AVGX = 30;
- REGR_AVGY = 31;
- REGR_SXX = 32;
- REGR_SYY = 33;
- REGR_SXY = 34;
- STRING_AGG = 35;
- NTH_VALUE_AGG = 36;
-}
-
-message AggregateExprNode {
- AggregateFunction aggr_function = 1;
- repeated LogicalExprNode expr = 2;
- bool distinct = 3;
- LogicalExprNode filter = 4;
- repeated LogicalExprNode order_by = 5;
-}
message AggregateUDFExprNode {
string fun_name = 1;
repeated LogicalExprNode args = 2;
+ bool distinct = 5;
LogicalExprNode filter = 3;
- repeated LogicalExprNode order_by = 4;
+ repeated SortExprNode order_by = 4;
+ optional bytes fun_definition = 6;
}
message ScalarUDFExprNode {
@@ -531,7 +506,8 @@ message ScalarUDFExprNode {
}
enum BuiltInWindowFunction {
- ROW_NUMBER = 0;
+ UNSPECIFIED = 0; //
https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum
+ // ROW_NUMBER = 0;
RANK = 1;
DENSE_RANK = 2;
PERCENT_RANK = 3;
@@ -546,16 +522,16 @@ enum BuiltInWindowFunction {
message WindowExprNode {
oneof window_function {
- AggregateFunction aggr_function = 1;
BuiltInWindowFunction built_in_function = 2;
string udaf = 3;
string udwf = 9;
}
LogicalExprNode expr = 4;
repeated LogicalExprNode partition_by = 5;
- repeated LogicalExprNode order_by = 6;
+ repeated SortExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
WindowFrame window_frame = 8;
+ optional bytes fun_definition = 10;
}
message BetweenNode {
@@ -674,9 +650,11 @@ message PlanType {
datafusion_common.EmptyMessage FinalLogicalPlan = 3;
datafusion_common.EmptyMessage InitialPhysicalPlan = 4;
datafusion_common.EmptyMessage InitialPhysicalPlanWithStats = 9;
+ datafusion_common.EmptyMessage InitialPhysicalPlanWithSchema = 11;
OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
datafusion_common.EmptyMessage FinalPhysicalPlan = 6;
datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10;
+ datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12;
}
}
@@ -737,10 +715,11 @@ message PhysicalPlanNode {
AnalyzeExecNode analyze = 23;
JsonSinkExecNode json_sink = 24;
SymmetricHashJoinExecNode symmetric_hash_join = 25;
- InterleaveExecNode interleave = 26;
+ InterleaveExecNode interleave = 26;
PlaceholderRowExecNode placeholder_row = 27;
CsvSinkExecNode csv_sink = 28;
ParquetSinkExecNode parquet_sink = 29;
+ UnnestExecNode unnest = 30;
}
}
@@ -752,13 +731,21 @@ message PartitionColumn {
message FileSinkConfig {
reserved 6; // writer_mode
+ reserved 8; // was `overwrite` which has been superseded by `insert_op`
string object_store_url = 1;
repeated PartitionedFile file_groups = 2;
repeated string table_paths = 3;
datafusion_common.Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
- bool overwrite = 8;
+ bool keep_partition_by_columns = 9;
+ InsertOp insert_op = 10;
+}
+
+enum InsertOp {
+ Append = 0;
+ Overwrite = 1;
+ Replace = 2;
}
message JsonSink {
@@ -797,6 +784,19 @@ message ParquetSinkExecNode {
PhysicalSortExprNodeCollection sort_order = 4;
}
+message UnnestExecNode {
+ PhysicalPlanNode input = 1;
+ datafusion_common.Schema schema = 2;
+ repeated ListUnnest list_type_columns = 3;
+ repeated uint64 struct_type_columns = 4;
+ UnnestOptions options = 5;
+}
+
+message ListUnnest {
+ uint32 index_in_input_schema = 1;
+ uint32 depth = 2;
+}
+
message PhysicalExtensionNode {
bytes node = 1;
repeated PhysicalPlanNode inputs = 2;
@@ -838,6 +838,8 @@ message PhysicalExprNode {
// was PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
PhysicalLikeExprNode like_expr = 18;
+
+ PhysicalExtensionExprNode extension = 19;
}
}
@@ -850,17 +852,17 @@ message PhysicalScalarUdfNode {
message PhysicalAggregateExprNode {
oneof AggregateFunction {
- AggregateFunction aggr_function = 1;
string user_defined_aggr_function = 4;
}
repeated PhysicalExprNode expr = 2;
repeated PhysicalSortExprNode ordering_req = 5;
bool distinct = 3;
+ bool ignore_nulls = 6;
+ optional bytes fun_definition = 7;
}
message PhysicalWindowExprNode {
oneof window_function {
- AggregateFunction aggr_function = 1;
BuiltInWindowFunction built_in_function = 2;
string user_defined_aggr_function = 3;
}
@@ -869,6 +871,7 @@ message PhysicalWindowExprNode {
repeated PhysicalSortExprNode order_by = 6;
WindowFrame window_frame = 7;
string name = 8;
+ optional bytes fun_definition = 9;
}
message PhysicalIsNull {
@@ -944,10 +947,16 @@ message PhysicalNegativeNode {
PhysicalExprNode expr = 1;
}
+message PhysicalExtensionExprNode {
+ bytes expr = 1;
+ repeated PhysicalExprNode inputs = 2;
+}
+
message FilterExecNode {
PhysicalPlanNode input = 1;
PhysicalExprNode expr = 2;
uint32 default_filter_selectivity = 3;
+ repeated uint32 projection = 9;
}
message FileGroup {
@@ -994,6 +1003,10 @@ message CsvScanExecNode {
oneof optional_escape {
string escape = 5;
}
+ oneof optional_comment {
+ string comment = 6;
+ }
+ bool newlines_in_values = 7;
}
message AvroScanExecNode {
@@ -1174,6 +1187,7 @@ message NestedLoopJoinExecNode {
message CoalesceBatchesExecNode {
PhysicalPlanNode input = 1;
uint32 target_batch_size = 2;
+ optional uint32 fetch = 3;
}
message CoalescePartitionsExecNode {
@@ -1233,4 +1247,4 @@ message PartitionStats {
int64 num_batches = 2;
int64 num_bytes = 3;
repeated datafusion_common.ColumnStats column_stats = 4;
-}
+}
\ No newline at end of file
diff --git a/ballista/core/proto/datafusion_common.proto
b/ballista/core/proto/datafusion_common.proto
index d9ec7dbb..c3906abf 100644
--- a/ballista/core/proto/datafusion_common.proto
+++ b/ballista/core/proto/datafusion_common.proto
@@ -51,6 +51,11 @@ message ParquetFormat {
message AvroFormat {}
+message NdJsonFormat {
+ JsonOptions options = 1;
+}
+
+
message PrimaryKeyConstraint{
repeated uint64 indices = 1;
}
@@ -130,6 +135,12 @@ message Decimal{
int32 scale = 4;
}
+message Decimal256Type{
+ reserved 1, 2;
+ uint32 precision = 3;
+ int32 scale = 4;
+}
+
message List{
Field field_type = 1;
}
@@ -164,7 +175,7 @@ message Union{
repeated int32 type_ids = 3;
}
-// Used for List/FixedSizeList/LargeList/Struct
+// Used for List/FixedSizeList/LargeList/Struct/Map
message ScalarNestedValue {
message Dictionary {
bytes ipc_message = 1;
@@ -248,6 +259,7 @@ message ScalarValue{
bool bool_value = 1;
string utf8_value = 2;
string large_utf8_value = 3;
+ string utf8_view_value = 23;
int32 int8_value = 4;
int32 int16_value = 5;
int32 int32_value = 6;
@@ -265,6 +277,7 @@ message ScalarValue{
ScalarNestedValue list_value = 17;
ScalarNestedValue fixed_size_list_value = 18;
ScalarNestedValue struct_value = 32;
+ ScalarNestedValue map_value = 41;
Decimal128 decimal128_value = 20;
Decimal256 decimal256_value = 39;
@@ -281,6 +294,7 @@ message ScalarValue{
ScalarDictionaryValue dictionary_value = 27;
bytes binary_value = 28;
bytes large_binary_value = 29;
+ bytes binary_view_value = 22;
ScalarTime64Value time64_value = 30;
IntervalDayTimeValue interval_daytime_value = 25;
IntervalMonthDayNanoValue interval_month_day_nano = 31;
@@ -318,8 +332,10 @@ message ArrowType{
EmptyMessage FLOAT32 = 12 ;
EmptyMessage FLOAT64 = 13 ;
EmptyMessage UTF8 = 14 ;
+ EmptyMessage UTF8_VIEW = 35;
EmptyMessage LARGE_UTF8 = 32;
EmptyMessage BINARY = 15 ;
+ EmptyMessage BINARY_VIEW = 34;
int32 FIXED_SIZE_BINARY = 16 ;
EmptyMessage LARGE_BINARY = 31;
EmptyMessage DATE32 = 17 ;
@@ -330,6 +346,7 @@ message ArrowType{
TimeUnit TIME64 = 22 ;
IntervalUnit INTERVAL = 23 ;
Decimal DECIMAL = 24 ;
+ Decimal256Type DECIMAL256 = 36;
List LIST = 25;
List LARGE_LIST = 26;
FixedSizeList FIXED_SIZE_LIST = 27;
@@ -381,6 +398,12 @@ message CsvWriterOptions {
string time_format = 7;
// Optional value to represent null
string null_value = 8;
+ // Optional quote. Defaults to `b'"'`
+ string quote = 9;
+ // Optional escape. Defaults to `'\\'`
+ string escape = 10;
+ // Optional flag whether to double quotes, instead of escaping. Defaults to
`true`
+ bool double_quote = 11;
}
// Options controlling CSV format
@@ -397,6 +420,10 @@ message CsvOptions {
string timestamp_tz_format = 10; // Optional timestamp with timezone format
string time_format = 11; // Optional time format
string null_value = 12; // Optional representation of null value
+ bytes comment = 13; // Optional comment character as a byte
+ bytes double_quote = 14; // Indicates if quotes are doubled
+ bytes newlines_in_values = 15; // Indicates if newlines are supported in
values
+ bytes terminator = 16; // Optional terminator character as a byte
}
// Options controlling CSV format
@@ -407,15 +434,16 @@ message JsonOptions {
message TableParquetOptions {
ParquetOptions global = 1;
- repeated ColumnSpecificOptions column_specific_options = 2;
+ repeated ParquetColumnSpecificOptions column_specific_options = 2;
+ map<string, string> key_value_metadata = 3;
}
-message ColumnSpecificOptions {
+message ParquetColumnSpecificOptions {
string column_name = 1;
- ColumnOptions options = 2;
+ ParquetColumnOptions options = 2;
}
-message ColumnOptions {
+message ParquetColumnOptions {
oneof bloom_filter_enabled_opt {
bool bloom_filter_enabled = 1;
}
@@ -465,6 +493,7 @@ message ParquetOptions {
uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false
+ bool schema_force_view_types = 28; // default = false
oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
@@ -538,4 +567,4 @@ message ColumnStats {
Precision max_value = 2;
Precision null_count = 3;
Precision distinct_count = 4;
-}
+}
\ No newline at end of file
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 2f856b39..4a2c25b8 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -419,6 +419,7 @@ fn fetch_partition_local_inner(
let file = File::open(path).map_err(|e| {
BallistaError::General(format!("Failed to open partition file at
{path}: {e:?}"))
})?;
+ let file = BufReader::new(file);
let reader = StreamReader::try_new(file, None).map_err(|e| {
BallistaError::General(format!("Failed to new arrow FileReader at
{path}: {e:?}"))
})?;
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index c45bdeaf..51a7b80b 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -2,7 +2,6 @@
///
/////////////////////////////////////////////////////////////////////////////////////////////////
/// Ballista Physical Plan
///
/////////////////////////////////////////////////////////////////////////////////////////////////
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BallistaPhysicalPlanNode {
#[prost(oneof = "ballista_physical_plan_node::PhysicalPlanType", tags =
"1, 2, 3")]
@@ -12,7 +11,6 @@ pub struct BallistaPhysicalPlanNode {
}
/// Nested message and enum types in `BallistaPhysicalPlanNode`.
pub mod ballista_physical_plan_node {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum PhysicalPlanType {
#[prost(message, tag = "1")]
@@ -23,7 +21,6 @@ pub mod ballista_physical_plan_node {
UnresolvedShuffle(super::UnresolvedShuffleExecNode),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleWriterExecNode {
/// TODO it seems redundant to provide job and stage id here since we also
have them
@@ -39,7 +36,6 @@ pub struct ShuffleWriterExecNode {
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnresolvedShuffleExecNode {
#[prost(uint32, tag = "1")]
@@ -49,7 +45,6 @@ pub struct UnresolvedShuffleExecNode {
#[prost(uint32, tag = "4")]
pub output_partition_count: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleReaderExecNode {
#[prost(message, repeated, tag = "1")]
@@ -60,7 +55,6 @@ pub struct ShuffleReaderExecNode {
#[prost(uint32, tag = "3")]
pub stage_id: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleReaderPartition {
/// each partition of a shuffle read can read data from multiple locations
@@ -70,7 +64,6 @@ pub struct ShuffleReaderPartition {
///
/////////////////////////////////////////////////////////////////////////////////////////////////
/// Ballista Scheduling
///
/////////////////////////////////////////////////////////////////////////////////////////////////
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutionGraph {
#[prost(string, tag = "1")]
@@ -100,7 +93,6 @@ pub struct ExecutionGraph {
#[prost(uint64, tag = "13")]
pub queued_at: u64,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StageAttempts {
#[prost(uint32, tag = "1")]
@@ -108,7 +100,6 @@ pub struct StageAttempts {
#[prost(uint32, repeated, tag = "2")]
pub stage_attempt_num: ::prost::alloc::vec::Vec<u32>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutionGraphStage {
#[prost(oneof = "execution_graph_stage::StageType", tags = "1, 2, 3, 4")]
@@ -116,7 +107,6 @@ pub struct ExecutionGraphStage {
}
/// Nested message and enum types in `ExecutionGraphStage`.
pub mod execution_graph_stage {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum StageType {
#[prost(message, tag = "1")]
@@ -129,7 +119,6 @@ pub mod execution_graph_stage {
FailedStage(super::FailedStage),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnResolvedStage {
#[prost(uint32, tag = "1")]
@@ -147,7 +136,6 @@ pub struct UnResolvedStage {
::prost::alloc::string::String,
>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ResolvedStage {
#[prost(uint32, tag = "1")]
@@ -167,7 +155,6 @@ pub struct ResolvedStage {
::prost::alloc::string::String,
>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulStage {
#[prost(uint32, tag = "1")]
@@ -187,7 +174,6 @@ pub struct SuccessfulStage {
#[prost(uint32, tag = "9")]
pub stage_attempt_num: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedStage {
#[prost(uint32, tag = "1")]
@@ -207,7 +193,6 @@ pub struct FailedStage {
#[prost(uint32, tag = "9")]
pub stage_attempt_num: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskInfo {
#[prost(uint32, tag = "1")]
@@ -234,7 +219,6 @@ pub struct TaskInfo {
}
/// Nested message and enum types in `TaskInfo`.
pub mod task_info {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(message, tag = "8")]
@@ -245,7 +229,6 @@ pub mod task_info {
Successful(super::SuccessfulTask),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GraphStageInput {
#[prost(uint32, tag = "1")]
@@ -255,7 +238,6 @@ pub struct GraphStageInput {
#[prost(bool, tag = "3")]
pub complete: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskInputPartitions {
#[prost(uint32, tag = "1")]
@@ -263,7 +245,6 @@ pub struct TaskInputPartitions {
#[prost(message, repeated, tag = "2")]
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KeyValuePair {
#[prost(string, tag = "1")]
@@ -271,7 +252,6 @@ pub struct KeyValuePair {
#[prost(string, tag = "2")]
pub value: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Action {
/// configuration settings
@@ -282,7 +262,6 @@ pub struct Action {
}
/// Nested message and enum types in `Action`.
pub mod action {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ActionType {
/// Fetch a partition from an executor
@@ -290,7 +269,6 @@ pub mod action {
FetchPartition(super::FetchPartition),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutePartition {
#[prost(string, tag = "1")]
@@ -310,7 +288,6 @@ pub struct ExecutePartition {
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchPartition {
#[prost(string, tag = "1")]
@@ -326,7 +303,6 @@ pub struct FetchPartition {
#[prost(uint32, tag = "6")]
pub port: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionLocation {
/// partition_id of the map stage who produces the shuffle.
@@ -343,7 +319,6 @@ pub struct PartitionLocation {
pub path: ::prost::alloc::string::String,
}
/// Unique identifier for a materialized partition of data
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionId {
#[prost(string, tag = "1")]
@@ -353,8 +328,7 @@ pub struct PartitionId {
#[prost(uint32, tag = "4")]
pub partition_id: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct TaskId {
#[prost(uint32, tag = "1")]
pub task_id: u32,
@@ -363,7 +337,6 @@ pub struct TaskId {
#[prost(uint32, tag = "3")]
pub partition_id: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionStats {
#[prost(int64, tag = "1")]
@@ -375,7 +348,6 @@ pub struct PartitionStats {
#[prost(message, repeated, tag = "4")]
pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ColumnStats {
#[prost(message, optional, tag = "1")]
@@ -387,13 +359,11 @@ pub struct ColumnStats {
#[prost(uint32, tag = "4")]
pub distinct_count: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OperatorMetricsSet {
#[prost(message, repeated, tag = "1")]
pub metrics: ::prost::alloc::vec::Vec<OperatorMetric>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedCount {
#[prost(string, tag = "1")]
@@ -401,7 +371,6 @@ pub struct NamedCount {
#[prost(uint64, tag = "2")]
pub value: u64,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedGauge {
#[prost(string, tag = "1")]
@@ -409,7 +378,6 @@ pub struct NamedGauge {
#[prost(uint64, tag = "2")]
pub value: u64,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedTime {
#[prost(string, tag = "1")]
@@ -417,7 +385,6 @@ pub struct NamedTime {
#[prost(uint64, tag = "2")]
pub value: u64,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OperatorMetric {
#[prost(
@@ -428,7 +395,6 @@ pub struct OperatorMetric {
}
/// Nested message and enum types in `OperatorMetric`.
pub mod operator_metric {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Metric {
#[prost(uint64, tag = "1")]
@@ -456,7 +422,6 @@ pub mod operator_metric {
}
}
/// Used by scheduler
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorMetadata {
#[prost(string, tag = "1")]
@@ -471,7 +436,6 @@ pub struct ExecutorMetadata {
pub specification: ::core::option::Option<ExecutorSpecification>,
}
/// Used by grpc
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorRegistration {
#[prost(string, tag = "1")]
@@ -491,14 +455,12 @@ pub struct ExecutorRegistration {
pub mod executor_registration {
/// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14
(see <https://github.com/tokio-rs/prost/issues/430> and
<https://github.com/tokio-rs/prost/pull/455>)
/// this syntax is ugly but is binary compatible with the "optional"
keyword (see
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalHost {
#[prost(string, tag = "2")]
Host(::prost::alloc::string::String),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorHeartbeat {
#[prost(string, tag = "1")]
@@ -511,8 +473,7 @@ pub struct ExecutorHeartbeat {
#[prost(message, optional, tag = "4")]
pub status: ::core::option::Option<ExecutorStatus>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ExecutorMetric {
/// TODO add more metrics
#[prost(oneof = "executor_metric::Metric", tags = "1")]
@@ -521,14 +482,12 @@ pub struct ExecutorMetric {
/// Nested message and enum types in `ExecutorMetric`.
pub mod executor_metric {
/// TODO add more metrics
- #[allow(clippy::derive_partial_eq_without_eq)]
- #[derive(Clone, PartialEq, ::prost::Oneof)]
+ #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
pub enum Metric {
#[prost(uint64, tag = "1")]
AvailableMemory(u64),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStatus {
#[prost(oneof = "executor_status::Status", tags = "1, 2, 3, 4")]
@@ -536,7 +495,6 @@ pub struct ExecutorStatus {
}
/// Nested message and enum types in `ExecutorStatus`.
pub mod executor_status {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(string, tag = "1")]
@@ -549,14 +507,12 @@ pub mod executor_status {
Terminating(::prost::alloc::string::String),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorSpecification {
#[prost(message, repeated, tag = "1")]
pub resources: ::prost::alloc::vec::Vec<ExecutorResource>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ExecutorResource {
/// TODO add more resources
#[prost(oneof = "executor_resource::Resource", tags = "1")]
@@ -565,14 +521,12 @@ pub struct ExecutorResource {
/// Nested message and enum types in `ExecutorResource`.
pub mod executor_resource {
/// TODO add more resources
- #[allow(clippy::derive_partial_eq_without_eq)]
- #[derive(Clone, PartialEq, ::prost::Oneof)]
+ #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
pub enum Resource {
#[prost(uint32, tag = "1")]
TaskSlots(u32),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AvailableTaskSlots {
#[prost(string, tag = "1")]
@@ -580,13 +534,11 @@ pub struct AvailableTaskSlots {
#[prost(uint32, tag = "2")]
pub slots: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorTaskSlots {
#[prost(message, repeated, tag = "1")]
pub task_slots: ::prost::alloc::vec::Vec<AvailableTaskSlots>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorData {
#[prost(string, tag = "1")]
@@ -594,21 +546,18 @@ pub struct ExecutorData {
#[prost(message, repeated, tag = "2")]
pub resources: ::prost::alloc::vec::Vec<ExecutorResourcePair>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ExecutorResourcePair {
#[prost(message, optional, tag = "1")]
pub total: ::core::option::Option<ExecutorResource>,
#[prost(message, optional, tag = "2")]
pub available: ::core::option::Option<ExecutorResource>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningTask {
#[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedTask {
#[prost(string, tag = "1")]
@@ -623,7 +572,6 @@ pub struct FailedTask {
}
/// Nested message and enum types in `FailedTask`.
pub mod failed_task {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum FailedReason {
#[prost(message, tag = "4")]
@@ -641,7 +589,6 @@ pub mod failed_task {
TaskKilled(super::TaskKilled),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulTask {
#[prost(string, tag = "1")]
@@ -651,10 +598,8 @@ pub struct SuccessfulTask {
#[prost(message, repeated, tag = "2")]
pub partitions: ::prost::alloc::vec::Vec<ShuffleWritePartition>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ExecutionError {}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchPartitionError {
#[prost(string, tag = "1")]
@@ -664,19 +609,14 @@ pub struct FetchPartitionError {
#[prost(uint32, tag = "3")]
pub map_partition_id: u32,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct IoError {}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ExecutorLost {}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ResultLost {}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct TaskKilled {}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleWritePartition {
#[prost(uint64, tag = "1")]
@@ -690,7 +630,6 @@ pub struct ShuffleWritePartition {
#[prost(uint64, tag = "5")]
pub num_bytes: u64,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskStatus {
#[prost(uint32, tag = "1")]
@@ -716,7 +655,6 @@ pub struct TaskStatus {
}
/// Nested message and enum types in `TaskStatus`.
pub mod task_status {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(message, tag = "9")]
@@ -727,7 +665,6 @@ pub mod task_status {
Successful(super::SuccessfulTask),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PollWorkParams {
#[prost(message, optional, tag = "1")]
@@ -738,7 +675,6 @@ pub struct PollWorkParams {
#[prost(message, repeated, tag = "3")]
pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskDefinition {
#[prost(uint32, tag = "1")]
@@ -763,7 +699,6 @@ pub struct TaskDefinition {
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
/// A set of tasks in the same stage
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MultiTaskDefinition {
#[prost(message, repeated, tag = "1")]
@@ -783,13 +718,11 @@ pub struct MultiTaskDefinition {
#[prost(message, repeated, tag = "9")]
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SessionSettings {
#[prost(message, repeated, tag = "1")]
pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobSessionConfig {
#[prost(string, tag = "1")]
@@ -797,25 +730,21 @@ pub struct JobSessionConfig {
#[prost(message, repeated, tag = "2")]
pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PollWorkResult {
#[prost(message, repeated, tag = "1")]
pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RegisterExecutorParams {
#[prost(message, optional, tag = "1")]
pub metadata: ::core::option::Option<ExecutorRegistration>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct RegisterExecutorResult {
#[prost(bool, tag = "1")]
pub success: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeartBeatParams {
#[prost(string, tag = "1")]
@@ -827,14 +756,12 @@ pub struct HeartBeatParams {
#[prost(message, optional, tag = "4")]
pub metadata: ::core::option::Option<ExecutorRegistration>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HeartBeatResult {
/// TODO it's from Spark for BlockManager
#[prost(bool, tag = "1")]
pub reregister: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StopExecutorParams {
#[prost(string, tag = "1")]
@@ -846,10 +773,8 @@ pub struct StopExecutorParams {
#[prost(bool, tag = "3")]
pub force: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct StopExecutorResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStoppedParams {
#[prost(string, tag = "1")]
@@ -858,10 +783,8 @@ pub struct ExecutorStoppedParams {
#[prost(string, tag = "2")]
pub reason: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ExecutorStoppedResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateTaskStatusParams {
#[prost(string, tag = "1")]
@@ -870,13 +793,11 @@ pub struct UpdateTaskStatusParams {
#[prost(message, repeated, tag = "2")]
pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct UpdateTaskStatusResult {
#[prost(bool, tag = "1")]
pub success: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryParams {
#[prost(message, repeated, tag = "4")]
@@ -890,7 +811,6 @@ pub struct ExecuteQueryParams {
}
/// Nested message and enum types in `ExecuteQueryParams`.
pub mod execute_query_params {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Query {
#[prost(bytes, tag = "1")]
@@ -898,26 +818,22 @@ pub mod execute_query_params {
#[prost(string, tag = "2")]
Sql(::prost::alloc::string::String),
}
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalSessionId {
#[prost(string, tag = "3")]
SessionId(::prost::alloc::string::String),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateSessionParams {
#[prost(message, repeated, tag = "1")]
pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateSessionResult {
#[prost(string, tag = "1")]
pub session_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateSessionParams {
#[prost(string, tag = "1")]
@@ -925,31 +841,26 @@ pub struct UpdateSessionParams {
#[prost(message, repeated, tag = "2")]
pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct UpdateSessionResult {
#[prost(bool, tag = "1")]
pub success: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RemoveSessionParams {
#[prost(string, tag = "1")]
pub session_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct RemoveSessionResult {
#[prost(bool, tag = "1")]
pub success: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteSqlParams {
#[prost(string, tag = "1")]
pub sql: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryResult {
#[prost(oneof = "execute_query_result::Result", tags = "1, 2")]
@@ -957,7 +868,6 @@ pub struct ExecuteQueryResult {
}
/// Nested message and enum types in `ExecuteQueryResult`.
pub mod execute_query_result {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Result {
#[prost(message, tag = "1")]
@@ -966,7 +876,6 @@ pub mod execute_query_result {
Failure(super::ExecuteQueryFailureResult),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQuerySuccessResult {
#[prost(string, tag = "1")]
@@ -974,7 +883,6 @@ pub struct ExecuteQuerySuccessResult {
#[prost(string, tag = "2")]
pub session_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryFailureResult {
#[prost(oneof = "execute_query_failure_result::Failure", tags = "1, 2, 3")]
@@ -982,7 +890,6 @@ pub struct ExecuteQueryFailureResult {
}
/// Nested message and enum types in `ExecuteQueryFailureResult`.
pub mod execute_query_failure_result {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Failure {
#[prost(string, tag = "1")]
@@ -993,13 +900,11 @@ pub mod execute_query_failure_result {
SqlParsingFailure(::prost::alloc::string::String),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetJobStatusParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulJob {
#[prost(message, repeated, tag = "1")]
@@ -1011,14 +916,12 @@ pub struct SuccessfulJob {
#[prost(uint64, tag = "4")]
pub ended_at: u64,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct QueuedJob {
#[prost(uint64, tag = "1")]
pub queued_at: u64,
}
/// TODO: add progress report
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningJob {
#[prost(uint64, tag = "1")]
@@ -1028,7 +931,6 @@ pub struct RunningJob {
#[prost(string, tag = "3")]
pub scheduler: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedJob {
#[prost(string, tag = "1")]
@@ -1040,7 +942,6 @@ pub struct FailedJob {
#[prost(uint64, tag = "4")]
pub ended_at: u64,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobStatus {
#[prost(string, tag = "5")]
@@ -1052,7 +953,6 @@ pub struct JobStatus {
}
/// Nested message and enum types in `JobStatus`.
pub mod job_status {
- #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(message, tag = "1")]
@@ -1065,13 +965,11 @@ pub mod job_status {
Successful(super::SuccessfulJob),
}
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetJobStatusResult {
#[prost(message, optional, tag = "1")]
pub status: ::core::option::Option<JobStatus>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataParams {
#[prost(string, tag = "1")]
@@ -1079,40 +977,33 @@ pub struct GetFileMetadataParams {
#[prost(string, tag = "2")]
pub file_type: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataResult {
#[prost(message, optional, tag = "1")]
pub schema: ::core::option::Option<::datafusion_proto_common::Schema>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilePartitionMetadata {
#[prost(string, repeated, tag = "1")]
pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelJobParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct CancelJobResult {
#[prost(bool, tag = "1")]
pub cancelled: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CleanJobDataParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct CleanJobDataResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchTaskParams {
/// Allow to launch a task set to an executor at once
@@ -1121,7 +1012,6 @@ pub struct LaunchTaskParams {
#[prost(string, tag = "2")]
pub scheduler_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchMultiTaskParams {
/// Allow to launch a task set to an executor at once
@@ -1130,42 +1020,35 @@ pub struct LaunchMultiTaskParams {
#[prost(string, tag = "2")]
pub scheduler_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct LaunchTaskResult {
/// TODO when part of the task set are scheduled successfully
#[prost(bool, tag = "1")]
pub success: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct LaunchMultiTaskResult {
/// TODO when part of the task set are scheduled successfully
#[prost(bool, tag = "1")]
pub success: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelTasksParams {
#[prost(message, repeated, tag = "1")]
pub task_infos: ::prost::alloc::vec::Vec<RunningTaskInfo>,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct CancelTasksResult {
#[prost(bool, tag = "1")]
pub cancelled: bool,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RemoveJobDataParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct RemoveJobDataResult {}
-#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningTaskInfo {
#[prost(uint32, tag = "1")]
@@ -1179,7 +1062,13 @@ pub struct RunningTaskInfo {
}
/// Generated client implementations.
pub mod scheduler_grpc_client {
- #![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
+ #![allow(
+ unused_variables,
+ dead_code,
+ missing_docs,
+ clippy::wildcard_imports,
+ clippy::let_unit_value,
+ )]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
@@ -1201,8 +1090,8 @@ pub mod scheduler_grpc_client {
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
- T::ResponseBody: Body<Data = Bytes> + Send + 'static,
- <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+ T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
+ <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
@@ -1227,7 +1116,7 @@ pub mod scheduler_grpc_client {
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
- >>::Error: Into<StdError> + Send + Sync,
+ >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
{
SchedulerGrpcClient::new(InterceptedService::new(inner,
interceptor))
}
@@ -1271,8 +1160,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1296,8 +1184,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1328,8 +1215,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1358,8 +1244,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1388,8 +1273,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1415,8 +1299,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1442,8 +1325,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1469,8 +1351,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1496,8 +1377,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1523,8 +1403,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1551,8 +1430,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1578,8 +1456,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1603,8 +1480,7 @@ pub mod scheduler_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1623,7 +1499,13 @@ pub mod scheduler_grpc_client {
}
/// Generated client implementations.
pub mod executor_grpc_client {
- #![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
+ #![allow(
+ unused_variables,
+ dead_code,
+ missing_docs,
+ clippy::wildcard_imports,
+ clippy::let_unit_value,
+ )]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
@@ -1645,8 +1527,8 @@ pub mod executor_grpc_client {
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
- T::ResponseBody: Body<Data = Bytes> + Send + 'static,
- <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+ T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
+ <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
@@ -1671,7 +1553,7 @@ pub mod executor_grpc_client {
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
- >>::Error: Into<StdError> + Send + Sync,
+ >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
{
ExecutorGrpcClient::new(InterceptedService::new(inner,
interceptor))
}
@@ -1717,8 +1599,7 @@ pub mod executor_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1742,8 +1623,7 @@ pub mod executor_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1769,8 +1649,7 @@ pub mod executor_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1796,8 +1675,7 @@ pub mod executor_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1823,8 +1701,7 @@ pub mod executor_grpc_client {
.ready()
.await
.map_err(|e| {
- tonic::Status::new(
- tonic::Code::Unknown,
+ tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
@@ -1843,11 +1720,17 @@ pub mod executor_grpc_client {
}
/// Generated server implementations.
pub mod scheduler_grpc_server {
- #![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
+ #![allow(
+ unused_variables,
+ dead_code,
+ missing_docs,
+ clippy::wildcard_imports,
+ clippy::let_unit_value,
+ )]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for
use with SchedulerGrpcServer.
#[async_trait]
- pub trait SchedulerGrpc: Send + Sync + 'static {
+ pub trait SchedulerGrpc: std::marker::Send + std::marker::Sync + 'static {
/// Executors must poll the scheduler for heartbeat and to receive
tasks
async fn poll_work(
&self,
@@ -1936,20 +1819,18 @@ pub mod scheduler_grpc_server {
>;
}
#[derive(Debug)]
- pub struct SchedulerGrpcServer<T: SchedulerGrpc> {
- inner: _Inner<T>,
+ pub struct SchedulerGrpcServer<T> {
+ inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
- struct _Inner<T>(Arc<T>);
- impl<T: SchedulerGrpc> SchedulerGrpcServer<T> {
+ impl<T> SchedulerGrpcServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
- let inner = _Inner(inner);
Self {
inner,
accept_compression_encodings: Default::default(),
@@ -1999,8 +1880,8 @@ pub mod scheduler_grpc_server {
impl<T, B> tonic::codegen::Service<http::Request<B>> for
SchedulerGrpcServer<T>
where
T: SchedulerGrpc,
- B: Body + Send + 'static,
- B::Error: Into<StdError> + Send + 'static,
+ B: Body + std::marker::Send + 'static,
+ B::Error: Into<StdError> + std::marker::Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
@@ -2012,7 +1893,6 @@ pub mod scheduler_grpc_server {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
- let inner = self.inner.clone();
match req.uri().path() {
"/ballista.protobuf.SchedulerGrpc/PollWork" => {
#[allow(non_camel_case_types)]
@@ -2043,7 +1923,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = PollWorkSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2090,7 +1969,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = RegisterExecutorSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2140,7 +2018,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = HeartBeatFromExecutorSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2187,7 +2064,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = UpdateTaskStatusSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2234,7 +2110,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = GetFileMetadataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2280,7 +2155,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = CreateSessionSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2326,7 +2200,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = UpdateSessionSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2372,7 +2245,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = RemoveSessionSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2418,7 +2290,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = ExecuteQuerySvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2464,7 +2335,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = GetJobStatusSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2511,7 +2381,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = ExecutorStoppedSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2557,7 +2426,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = CancelJobSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2603,7 +2471,6 @@ pub mod scheduler_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = CleanJobDataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2622,20 +2489,25 @@ pub mod scheduler_grpc_server {
}
_ => {
Box::pin(async move {
- Ok(
- http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap(),
- )
+ let mut response = http::Response::new(empty_body());
+ let headers = response.headers_mut();
+ headers
+ .insert(
+ tonic::Status::GRPC_STATUS,
+ (tonic::Code::Unimplemented as i32).into(),
+ );
+ headers
+ .insert(
+ http::header::CONTENT_TYPE,
+ tonic::metadata::GRPC_CONTENT_TYPE,
+ );
+ Ok(response)
})
}
}
}
}
- impl<T: SchedulerGrpc> Clone for SchedulerGrpcServer<T> {
+ impl<T> Clone for SchedulerGrpcServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
@@ -2647,27 +2519,25 @@ pub mod scheduler_grpc_server {
}
}
}
- impl<T: SchedulerGrpc> Clone for _Inner<T> {
- fn clone(&self) -> Self {
- Self(Arc::clone(&self.0))
- }
- }
- impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{:?}", self.0)
- }
- }
- impl<T: SchedulerGrpc> tonic::server::NamedService for
SchedulerGrpcServer<T> {
- const NAME: &'static str = "ballista.protobuf.SchedulerGrpc";
+ /// Generated gRPC service name
+ pub const SERVICE_NAME: &str = "ballista.protobuf.SchedulerGrpc";
+ impl<T> tonic::server::NamedService for SchedulerGrpcServer<T> {
+ const NAME: &'static str = SERVICE_NAME;
}
}
/// Generated server implementations.
pub mod executor_grpc_server {
- #![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
+ #![allow(
+ unused_variables,
+ dead_code,
+ missing_docs,
+ clippy::wildcard_imports,
+ clippy::let_unit_value,
+ )]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for
use with ExecutorGrpcServer.
#[async_trait]
- pub trait ExecutorGrpc: Send + Sync + 'static {
+ pub trait ExecutorGrpc: std::marker::Send + std::marker::Sync + 'static {
async fn launch_task(
&self,
request: tonic::Request<super::LaunchTaskParams>,
@@ -2705,20 +2575,18 @@ pub mod executor_grpc_server {
>;
}
#[derive(Debug)]
- pub struct ExecutorGrpcServer<T: ExecutorGrpc> {
- inner: _Inner<T>,
+ pub struct ExecutorGrpcServer<T> {
+ inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
- struct _Inner<T>(Arc<T>);
- impl<T: ExecutorGrpc> ExecutorGrpcServer<T> {
+ impl<T> ExecutorGrpcServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
- let inner = _Inner(inner);
Self {
inner,
accept_compression_encodings: Default::default(),
@@ -2768,8 +2636,8 @@ pub mod executor_grpc_server {
impl<T, B> tonic::codegen::Service<http::Request<B>> for
ExecutorGrpcServer<T>
where
T: ExecutorGrpc,
- B: Body + Send + 'static,
- B::Error: Into<StdError> + Send + 'static,
+ B: Body + std::marker::Send + 'static,
+ B::Error: Into<StdError> + std::marker::Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
@@ -2781,7 +2649,6 @@ pub mod executor_grpc_server {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
- let inner = self.inner.clone();
match req.uri().path() {
"/ballista.protobuf.ExecutorGrpc/LaunchTask" => {
#[allow(non_camel_case_types)]
@@ -2812,7 +2679,6 @@ pub mod executor_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = LaunchTaskSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2859,7 +2725,6 @@ pub mod executor_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = LaunchMultiTaskSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2905,7 +2770,6 @@ pub mod executor_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = StopExecutorSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2951,7 +2815,6 @@ pub mod executor_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = CancelTasksSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -2997,7 +2860,6 @@ pub mod executor_grpc_server {
let max_encoding_message_size =
self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
- let inner = inner.0;
let method = RemoveJobDataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
@@ -3016,20 +2878,25 @@ pub mod executor_grpc_server {
}
_ => {
Box::pin(async move {
- Ok(
- http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap(),
- )
+ let mut response = http::Response::new(empty_body());
+ let headers = response.headers_mut();
+ headers
+ .insert(
+ tonic::Status::GRPC_STATUS,
+ (tonic::Code::Unimplemented as i32).into(),
+ );
+ headers
+ .insert(
+ http::header::CONTENT_TYPE,
+ tonic::metadata::GRPC_CONTENT_TYPE,
+ );
+ Ok(response)
})
}
}
}
}
- impl<T: ExecutorGrpc> Clone for ExecutorGrpcServer<T> {
+ impl<T> Clone for ExecutorGrpcServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
@@ -3041,17 +2908,9 @@ pub mod executor_grpc_server {
}
}
}
- impl<T: ExecutorGrpc> Clone for _Inner<T> {
- fn clone(&self) -> Self {
- Self(Arc::clone(&self.0))
- }
- }
- impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{:?}", self.0)
- }
- }
- impl<T: ExecutorGrpc> tonic::server::NamedService for
ExecutorGrpcServer<T> {
- const NAME: &'static str = "ballista.protobuf.ExecutorGrpc";
+ /// Generated gRPC service name
+ pub const SERVICE_NAME: &str = "ballista.protobuf.ExecutorGrpc";
+ impl<T> tonic::server::NamedService for ExecutorGrpcServer<T> {
+ const NAME: &'static str = SERVICE_NAME;
}
}
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 2bb555d1..fce4a399 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -309,7 +309,7 @@ impl PhysicalExtensionCodec for
BallistaPhysicalExtensionCodec {
Some(datafusion_proto::protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
-
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
&default_codec))
+
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(&expr.clone(),
&default_codec))
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
})
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs
b/ballista/core/src/serde/scheduler/to_proto.rs
index f6a878fa..29b00dd7 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -106,7 +106,7 @@ pub fn hash_partitioning_to_proto(
Ok(Some(datafusion_protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
-
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
&default_codec))
+
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(&expr.clone(),
&default_codec))
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
}))
diff --git a/ballista/executor/src/flight_service.rs
b/ballista/executor/src/flight_service.rs
index 43387f09..a96a752c 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -38,7 +38,7 @@ use arrow_flight::{
use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
use futures::{Stream, StreamExt, TryStreamExt};
use log::{debug, info};
-use std::io::{Read, Seek};
+use std::io::{BufReader, Read, Seek};
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::error::SendError;
use tokio::{sync::mpsc::Sender, task};
@@ -95,6 +95,7 @@ impl FlightService for BallistaFlightService {
))
})
.map_err(|e| from_ballista_err(&e))?;
+ let file = BufReader::new(file);
let reader =
StreamReader::try_new(file, None).map_err(|e|
from_arrow_err(&e))?;
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index e6c4b3ad..1367f17b 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -43,7 +43,7 @@ prometheus-metrics = ["prometheus", "once_cell"]
anyhow = "1"
arrow-flight = { workspace = true }
async-trait = { workspace = true }
-axum = "0.6.20"
+axum = "0.7.7"
ballista-core = { path = "../core", version = "0.12.0", features = ["s3"] }
base64 = { version = "0.22" }
clap = { workspace = true }
@@ -53,7 +53,7 @@ datafusion = { workspace = true }
datafusion-proto = { workspace = true }
futures = { workspace = true }
graphviz-rust = "0.9.0"
-http = "0.2.9"
+http = "1.1"
log = { workspace = true }
object_store = { workspace = true }
once_cell = { version = "1.16.0", optional = true }
diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs
index d0c8c270..5a3e00cc 100644
--- a/ballista/scheduler/build.rs
+++ b/ballista/scheduler/build.rs
@@ -24,6 +24,6 @@ fn main() -> Result<(), String> {
println!("cargo:rerun-if-changed=proto/keda.proto");
tonic_build::configure()
- .compile(&["proto/keda.proto"], &["proto"])
+ .compile_protos(&["proto/keda.proto"], &["proto"])
.map_err(|e| format!("protobuf compilation failed: {e}"))
}
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index c6297d78..2187db06 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -64,7 +64,9 @@ use datafusion::arrow;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::error::ArrowError;
-use datafusion::arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions};
+use datafusion::arrow::ipc::writer::{
+ DictionaryTracker, IpcDataGenerator, IpcWriteOptions,
+};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
use datafusion::logical_expr::LogicalPlan;
@@ -368,7 +370,15 @@ impl FlightSqlServiceImpl {
let options = IpcWriteOptions::default();
let pair = SchemaAsIpc::new(&arrow_schema, &options);
let data_gen = IpcDataGenerator::default();
- let encoded_data = data_gen.schema_to_bytes(pair.0, pair.1);
+ let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(
+ false,
+ pair.1.preserve_dict_id(),
+ );
+ let encoded_data = data_gen.schema_to_bytes_with_dictionary_tracker(
+ pair.0,
+ &mut dictionary_tracker,
+ pair.1,
+ );
let mut schema_bytes = vec![];
arrow::ipc::writer::write_message(&mut schema_bytes, encoded_data,
pair.1)
.map_err(|e| Status::internal(format!("Error encoding schema:
{e}")))?;
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index 140dc141..5d1671a0 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -80,15 +80,15 @@ pub async fn start_server(
FlightSqlServiceImpl::new(scheduler_server.clone()),
));
- let tonic = tonic_builder.into_service().into_router();
+ let tonic = tonic_builder.into_service().into_axum_router();
let axum = get_routes(Arc::new(scheduler_server));
let merged = axum
.merge(tonic)
.into_make_service_with_connect_info::<SocketAddr>();
- axum::Server::bind(&addr)
- .serve(merged)
+ let listener = tokio::net::TcpListener::bind(&addr)
.await
- .map_err(Error::from)
+ .map_err(Error::from)?;
+ axum::serve(listener, merged).await.map_err(Error::from)
}
diff --git a/ballista/scheduler/src/state/session_manager.rs
b/ballista/scheduler/src/state/session_manager.rs
index 4cf9d83f..8880dfd1 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -66,7 +66,7 @@ pub fn create_datafusion_context(
session_builder: SessionBuilder,
) -> Arc<SessionContext> {
let config =
-
SessionConfig::from_string_hash_map(ballista_config.settings().clone()).unwrap();
+
SessionConfig::from_string_hash_map(&ballista_config.settings().clone()).unwrap();
let config = config
.with_target_partitions(ballista_config.default_shuffle_partitions())
.with_batch_size(ballista_config.default_batch_size())
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index 5e5dee12..27bc0ec8 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -950,7 +950,7 @@ pub async fn test_join_plan(partition: usize) ->
ExecutionGraph {
.build()
.unwrap();
- let sort_expr = Expr::Sort(SortExpr::new(Box::new(col("id")), false,
false));
+ let sort_expr = SortExpr::new(col("id"), false, false);
let logical_plan = left_plan
.join(right_plan, JoinType::Inner, (vec!["id"], vec!["id"]), None)
diff --git a/python/Cargo.toml b/python/Cargo.toml
index eb662cb1..758d162f 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -33,11 +33,11 @@ publish = false
async-trait = "0.1.77"
ballista = { path = "../ballista/client", version = "0.12.0" }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
-datafusion = "41.0.0"
-datafusion-proto = "41.0.0"
-datafusion-python = "41.0.0"
+datafusion = { workspace = true }
+datafusion-proto = { workspace = true }
+datafusion-python = { workspace = true }
-pyo3 = { version = "0.21", features = ["extension-module", "abi3",
"abi3-py38"] }
+pyo3 = { version = "0.22", features = ["extension-module", "abi3",
"abi3-py38"] }
pyo3-log = "0.11.0"
tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]