This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new a7c527f3 Update datafusion requirement from 14.0.0 to 15.0.0 (#552)
a7c527f3 is described below
commit a7c527f36a3f70a3a4a53de631535b67e708b53c
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Dec 9 10:13:14 2022 +0800
Update datafusion requirement from 14.0.0 to 15.0.0 (#552)
* Update datafusion requirement from 14.0.0 to 15.0.0
* Fix UT
* Fix python
* Fix python
* Fix Python
Co-authored-by: yangzhong <[email protected]>
---
ballista-cli/Cargo.toml | 4 +-
ballista/client/Cargo.toml | 6 +-
ballista/client/src/context.rs | 19 +-
ballista/core/Cargo.toml | 8 +-
ballista/core/proto/ballista.proto | 1 +
ballista/core/proto/datafusion.proto | 25 +-
ballista/core/src/serde/generated/ballista.rs | 991 +++++++++++----------
ballista/core/src/serde/mod.rs | 4 +-
.../core/src/serde/physical_plan/from_proto.rs | 2 +
ballista/core/src/serde/physical_plan/mod.rs | 8 +-
ballista/core/src/serde/physical_plan/to_proto.rs | 6 +-
ballista/executor/Cargo.toml | 8 +-
ballista/scheduler/Cargo.toml | 6 +-
ballista/scheduler/src/display.rs | 2 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 6 +-
ballista/scheduler/src/state/execution_graph.rs | 10 +-
.../src/state/execution_graph/execution_stage.rs | 6 +-
.../scheduler/src/state/execution_graph_dot.rs | 129 ++-
ballista/scheduler/src/test_utils.rs | 2 +-
benchmarks/Cargo.toml | 4 +-
benchmarks/src/bin/tpch.rs | 4 +-
examples/Cargo.toml | 2 +-
python/Cargo.toml | 2 +-
python/src/context.rs | 16 +-
python/src/dataset.rs | 4 +-
python/src/datatype.rs | 39 +
python/src/lib.rs | 2 +
python/src/udaf.rs | 4 +
28 files changed, 781 insertions(+), 539 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index ea3f05e4..3f1c500f 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0",
features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "14.0.0"
-datafusion-cli = "14.0.0"
+datafusion = "15.0.0"
+datafusion-cli = "15.0.0"
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index a429ca2d..cc870c87 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.10.0" }
ballista-executor = { path = "../executor", version = "0.10.0", optional =
true }
ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional =
true }
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
-sqlparser = "0.26"
+sqlparser = "0.27"
tempfile = "3"
tokio = "1.0"
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 9c1b9e67..51a6c3bc 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -17,6 +17,7 @@
//! Distributed execution context.
+use datafusion::arrow::datatypes::SchemaRef;
use log::info;
use parking_lot::Mutex;
use sqlparser::ast::Statement;
@@ -375,6 +376,16 @@ impl BallistaContext {
..
}) => {
let table_exists = ctx.table_exist(name.as_str())?;
+ let schema: SchemaRef =
Arc::new(schema.as_ref().to_owned().into());
+ let table_partition_cols = table_partition_cols
+ .iter()
+ .map(|col| {
+ schema
+ .field_with_name(col)
+ .map(|f| (f.name().to_owned(),
f.data_type().to_owned()))
+ .map_err(DataFusionError::ArrowError)
+ })
+ .collect::<Result<Vec<_>>>()?;
match (if_not_exists, table_exists) {
(_, false) => match file_type.to_lowercase().as_str() {
@@ -383,9 +394,8 @@ impl BallistaContext {
.has_header(*has_header)
.delimiter(*delimiter as u8)
.table_partition_cols(table_partition_cols.to_vec());
- let csv_schema = schema.as_ref().to_owned().into();
if !schema.fields().is_empty() {
- options = options.schema(&csv_schema);
+ options = options.schema(&schema);
}
self.register_csv(name, location, options).await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(),
&plan)))
@@ -395,7 +405,7 @@ impl BallistaContext {
name,
location,
ParquetReadOptions::default()
-
.table_partition_cols(table_partition_cols.to_vec()),
+
.table_partition_cols(table_partition_cols),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(),
&plan)))
@@ -405,7 +415,7 @@ impl BallistaContext {
name,
location,
AvroReadOptions::default()
-
.table_partition_cols(table_partition_cols.to_vec()),
+
.table_partition_cols(table_partition_cols),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(),
&plan)))
@@ -582,6 +592,7 @@ mod tests {
table_partition_cols: x.table_partition_cols.clone(),
collect_stat: x.collect_stat,
target_partitions: x.target_partitions,
+ file_sort_order: None,
};
let table_paths = listing_table
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 20570a45..1166c99e 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -45,13 +45,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }
-arrow-flight = { version = "26.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "28.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "14.0.0"
+datafusion = "15.0.0"
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false,
optional = true }
-datafusion-proto = "14.0.0"
+datafusion-proto = "15.0.0"
futures = "0.3"
hashbrown = "0.13"
@@ -67,7 +67,7 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
-sqlparser = "0.25"
+sqlparser = "0.27"
sys-info = "0.9.0"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index 5113a6a7..76d3521f 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -267,6 +267,7 @@ message AvroScanExecNode {
enum PartitionMode {
COLLECT_LEFT = 0;
PARTITIONED = 1;
+ AUTO = 2;
}
message HashJoinExecNode {
diff --git a/ballista/core/proto/datafusion.proto
b/ballista/core/proto/datafusion.proto
index c8ea70de..4152802d 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -109,6 +109,7 @@ message ListingTableScanNode {
ParquetFormat parquet = 11;
AvroFormat avro = 12;
}
+ repeated datafusion.LogicalExprNode file_sort_order = 13;
}
message ViewTableScanNode {
@@ -176,6 +177,7 @@ message CreateExternalTableNode {
string delimiter = 8;
string definition = 9;
string file_compression_type = 10;
+ map<string, string> options = 11;
}
message CreateCatalogSchemaNode {
@@ -409,8 +411,10 @@ message AliasNode {
}
message BinaryExprNode {
- LogicalExprNode l = 1;
- LogicalExprNode r = 2;
+ // Represents the operands from the left inner most expression
+ // to the right outer most expression where each of them are chained
+ // with the operator 'op'.
+ repeated LogicalExprNode operands = 1;
string op = 3;
}
@@ -739,6 +743,20 @@ message ScalarListValue{
repeated ScalarValue values = 2;
}
+message ScalarTime32Value {
+ oneof value {
+ int32 time32_second_value = 1;
+ int32 time32_millisecond_value = 2;
+ };
+}
+
+message ScalarTime64Value {
+ oneof value {
+ int64 time64_microsecond_value = 1;
+ int64 time64_nanosecond_value = 2;
+ };
+}
+
message ScalarTimestampValue {
oneof value {
int64 time_microsecond_value = 1;
@@ -797,6 +815,7 @@ message ScalarValue{
double float64_value = 13;
// Literal Date32 value always has a unit of day
int32 date_32_value = 14;
+ ScalarTime32Value time32_value = 15;
ScalarListValue list_value = 17;
//WAS: ScalarType null_list_value = 18;
@@ -808,7 +827,7 @@ message ScalarValue{
ScalarDictionaryValue dictionary_value = 27;
bytes binary_value = 28;
bytes large_binary_value = 29;
- int64 time64_value = 30;
+ ScalarTime64Value time64_value = 30;
IntervalMonthDayNanoValue interval_month_day_nano = 31;
StructValue struct_value = 32;
ScalarFixedSizeBinary fixed_size_binary_value = 34;
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index 7f9685e8..d11c6ea5 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1,111 +1,113 @@
-//
/////////////////////////////////////////////////////////////////////////////////////////////////
-// Ballista Logical Plan
-//
/////////////////////////////////////////////////////////////////////////////////////////////////
-
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Statistics {
- #[prost(int64, tag="1")]
+ #[prost(int64, tag = "1")]
pub num_rows: i64,
- #[prost(int64, tag="2")]
+ #[prost(int64, tag = "2")]
pub total_byte_size: i64,
- #[prost(message, repeated, tag="3")]
+ #[prost(message, repeated, tag = "3")]
pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
- #[prost(bool, tag="4")]
+ #[prost(bool, tag = "4")]
pub is_exact: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileRange {
- #[prost(int64, tag="1")]
+ #[prost(int64, tag = "1")]
pub start: i64,
- #[prost(int64, tag="2")]
+ #[prost(int64, tag = "2")]
pub end: i64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionedFile {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub path: ::prost::alloc::string::String,
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
pub size: u64,
- #[prost(uint64, tag="3")]
+ #[prost(uint64, tag = "3")]
pub last_modified_ns: u64,
- #[prost(message, repeated, tag="4")]
- pub partition_values:
::prost::alloc::vec::Vec<::datafusion_proto::protobuf::ScalarValue>,
- #[prost(message, optional, tag="5")]
+ #[prost(message, repeated, tag = "4")]
+ pub partition_values: ::prost::alloc::vec::Vec<
+ ::datafusion_proto::protobuf::ScalarValue,
+ >,
+ #[prost(message, optional, tag = "5")]
pub range: ::core::option::Option<FileRange>,
}
-//
/////////////////////////////////////////////////////////////////////////////////////////////////
-// Ballista Physical Plan
-//
/////////////////////////////////////////////////////////////////////////////////////////////////
-
/// PhysicalPlanNode is a nested type
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalPlanNode {
- #[prost(oneof="physical_plan_node::PhysicalPlanType", tags="1, 2, 3, 4, 6,
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24")]
+ #[prost(
+ oneof = "physical_plan_node::PhysicalPlanType",
+ tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24"
+ )]
pub physical_plan_type:
::core::option::Option<physical_plan_node::PhysicalPlanType>,
}
/// Nested message and enum types in `PhysicalPlanNode`.
pub mod physical_plan_node {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum PhysicalPlanType {
- #[prost(message, tag="1")]
+ #[prost(message, tag = "1")]
ParquetScan(super::ParquetScanExecNode),
- #[prost(message, tag="2")]
+ #[prost(message, tag = "2")]
CsvScan(super::CsvScanExecNode),
- #[prost(message, tag="3")]
+ #[prost(message, tag = "3")]
Empty(super::EmptyExecNode),
- #[prost(message, tag="4")]
+ #[prost(message, tag = "4")]
Projection(::prost::alloc::boxed::Box<super::ProjectionExecNode>),
- #[prost(message, tag="6")]
+ #[prost(message, tag = "6")]
GlobalLimit(::prost::alloc::boxed::Box<super::GlobalLimitExecNode>),
- #[prost(message, tag="7")]
+ #[prost(message, tag = "7")]
LocalLimit(::prost::alloc::boxed::Box<super::LocalLimitExecNode>),
- #[prost(message, tag="8")]
+ #[prost(message, tag = "8")]
Aggregate(::prost::alloc::boxed::Box<super::AggregateExecNode>),
- #[prost(message, tag="9")]
+ #[prost(message, tag = "9")]
HashJoin(::prost::alloc::boxed::Box<super::HashJoinExecNode>),
- #[prost(message, tag="10")]
+ #[prost(message, tag = "10")]
ShuffleReader(super::ShuffleReaderExecNode),
- #[prost(message, tag="11")]
+ #[prost(message, tag = "11")]
Sort(::prost::alloc::boxed::Box<super::SortExecNode>),
- #[prost(message, tag="12")]
+ #[prost(message, tag = "12")]
CoalesceBatches(::prost::alloc::boxed::Box<super::CoalesceBatchesExecNode>),
- #[prost(message, tag="13")]
+ #[prost(message, tag = "13")]
Filter(::prost::alloc::boxed::Box<super::FilterExecNode>),
- #[prost(message, tag="14")]
+ #[prost(message, tag = "14")]
Merge(::prost::alloc::boxed::Box<super::CoalescePartitionsExecNode>),
- #[prost(message, tag="15")]
+ #[prost(message, tag = "15")]
Unresolved(super::UnresolvedShuffleExecNode),
- #[prost(message, tag="16")]
+ #[prost(message, tag = "16")]
Repartition(::prost::alloc::boxed::Box<super::RepartitionExecNode>),
- #[prost(message, tag="17")]
+ #[prost(message, tag = "17")]
Window(::prost::alloc::boxed::Box<super::WindowAggExecNode>),
- #[prost(message, tag="18")]
+ #[prost(message, tag = "18")]
ShuffleWriter(::prost::alloc::boxed::Box<super::ShuffleWriterExecNode>),
- #[prost(message, tag="19")]
+ #[prost(message, tag = "19")]
CrossJoin(::prost::alloc::boxed::Box<super::CrossJoinExecNode>),
- #[prost(message, tag="20")]
+ #[prost(message, tag = "20")]
AvroScan(super::AvroScanExecNode),
- #[prost(message, tag="21")]
+ #[prost(message, tag = "21")]
Extension(super::PhysicalExtensionNode),
- #[prost(message, tag="22")]
+ #[prost(message, tag = "22")]
Union(super::UnionExecNode),
- #[prost(message, tag="23")]
+ #[prost(message, tag = "23")]
Explain(super::ExplainExecNode),
- #[prost(message, tag="24")]
-
SortPreservingMerge(::prost::alloc::boxed::Box<super::SortPreservingMergeExecNode>),
+ #[prost(message, tag = "24")]
+ SortPreservingMerge(
+ ::prost::alloc::boxed::Box<super::SortPreservingMergeExecNode>,
+ ),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalExtensionNode {
- #[prost(bytes="vec", tag="1")]
+ #[prost(bytes = "vec", tag = "1")]
pub node: ::prost::alloc::vec::Vec<u8>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub inputs: ::prost::alloc::vec::Vec<PhysicalPlanNode>,
}
/// physical expressions
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalExprNode {
- #[prost(oneof="physical_expr_node::ExprType", tags="1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13, 14, 15, 16, 17")]
+ #[prost(
+ oneof = "physical_expr_node::ExprType",
+ tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17"
+ )]
pub expr_type: ::core::option::Option<physical_expr_node::ExprType>,
}
/// Nested message and enum types in `PhysicalExprNode`.
@@ -113,467 +115,481 @@ pub mod physical_expr_node {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ExprType {
/// column references
- #[prost(message, tag="1")]
+ #[prost(message, tag = "1")]
Column(super::PhysicalColumn),
- #[prost(message, tag="2")]
+ #[prost(message, tag = "2")]
Literal(::datafusion_proto::protobuf::ScalarValue),
/// binary expressions
- #[prost(message, tag="3")]
+ #[prost(message, tag = "3")]
BinaryExpr(::prost::alloc::boxed::Box<super::PhysicalBinaryExprNode>),
/// aggregate expressions
- #[prost(message, tag="4")]
+ #[prost(message, tag = "4")]
AggregateExpr(super::PhysicalAggregateExprNode),
/// null checks
- #[prost(message, tag="5")]
+ #[prost(message, tag = "5")]
IsNullExpr(::prost::alloc::boxed::Box<super::PhysicalIsNull>),
- #[prost(message, tag="6")]
+ #[prost(message, tag = "6")]
IsNotNullExpr(::prost::alloc::boxed::Box<super::PhysicalIsNotNull>),
- #[prost(message, tag="7")]
+ #[prost(message, tag = "7")]
NotExpr(::prost::alloc::boxed::Box<super::PhysicalNot>),
- #[prost(message, tag="8")]
+ #[prost(message, tag = "8")]
Case(::prost::alloc::boxed::Box<super::PhysicalCaseNode>),
- #[prost(message, tag="9")]
+ #[prost(message, tag = "9")]
Cast(::prost::alloc::boxed::Box<super::PhysicalCastNode>),
- #[prost(message, tag="10")]
+ #[prost(message, tag = "10")]
Sort(::prost::alloc::boxed::Box<super::PhysicalSortExprNode>),
- #[prost(message, tag="11")]
+ #[prost(message, tag = "11")]
Negative(::prost::alloc::boxed::Box<super::PhysicalNegativeNode>),
- #[prost(message, tag="12")]
+ #[prost(message, tag = "12")]
InList(::prost::alloc::boxed::Box<super::PhysicalInListNode>),
- #[prost(message, tag="13")]
+ #[prost(message, tag = "13")]
ScalarFunction(super::PhysicalScalarFunctionNode),
- #[prost(message, tag="14")]
+ #[prost(message, tag = "14")]
TryCast(::prost::alloc::boxed::Box<super::PhysicalTryCastNode>),
/// window expressions
- #[prost(message, tag="15")]
+ #[prost(message, tag = "15")]
WindowExpr(::prost::alloc::boxed::Box<super::PhysicalWindowExprNode>),
- #[prost(message, tag="16")]
+ #[prost(message, tag = "16")]
ScalarUdf(super::PhysicalScalarUdfNode),
- #[prost(message, tag="17")]
-
DateTimeIntervalExpr(::prost::alloc::boxed::Box<super::PhysicalDateTimeIntervalExprNode>),
+ #[prost(message, tag = "17")]
+ DateTimeIntervalExpr(
+
::prost::alloc::boxed::Box<super::PhysicalDateTimeIntervalExprNode>,
+ ),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalScalarUdfNode {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub args: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(message, optional, tag="4")]
+ #[prost(message, optional, tag = "4")]
pub return_type:
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalAggregateExprNode {
- #[prost(enumeration="::datafusion_proto::protobuf::AggregateFunction",
tag="1")]
+ #[prost(enumeration = "::datafusion_proto::protobuf::AggregateFunction",
tag = "1")]
pub aggr_function: i32,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(bool, tag="3")]
+ #[prost(bool, tag = "3")]
pub distinct: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalWindowExprNode {
- #[prost(message, optional, boxed, tag="4")]
+ #[prost(message, optional, boxed, tag = "4")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(oneof="physical_window_expr_node::WindowFunction", tags="1, 2")]
- pub window_function:
::core::option::Option<physical_window_expr_node::WindowFunction>,
+ #[prost(oneof = "physical_window_expr_node::WindowFunction", tags = "1,
2")]
+ pub window_function: ::core::option::Option<
+ physical_window_expr_node::WindowFunction,
+ >,
}
/// Nested message and enum types in `PhysicalWindowExprNode`.
pub mod physical_window_expr_node {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum WindowFunction {
- #[prost(enumeration="::datafusion_proto::protobuf::AggregateFunction",
tag="1")]
+ #[prost(
+ enumeration = "::datafusion_proto::protobuf::AggregateFunction",
+ tag = "1"
+ )]
AggrFunction(i32),
/// udaf = 3
-
#[prost(enumeration="::datafusion_proto::protobuf::BuiltInWindowFunction",
tag="2")]
+ #[prost(
+ enumeration =
"::datafusion_proto::protobuf::BuiltInWindowFunction",
+ tag = "2"
+ )]
BuiltInFunction(i32),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalIsNull {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalIsNotNull {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalNot {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalAliasNode {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub expr: ::core::option::Option<PhysicalExprNode>,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub alias: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalBinaryExprNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub l:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, optional, boxed, tag="2")]
+ #[prost(message, optional, boxed, tag = "2")]
pub r:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(string, tag="3")]
+ #[prost(string, tag = "3")]
pub op: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalDateTimeIntervalExprNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub l:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, optional, boxed, tag="2")]
+ #[prost(message, optional, boxed, tag = "2")]
pub r:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(string, tag="3")]
+ #[prost(string, tag = "3")]
pub op: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalSortExprNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(bool, tag="2")]
+ #[prost(bool, tag = "2")]
pub asc: bool,
- #[prost(bool, tag="3")]
+ #[prost(bool, tag = "3")]
pub nulls_first: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalWhenThen {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub when_expr: ::core::option::Option<PhysicalExprNode>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub then_expr: ::core::option::Option<PhysicalExprNode>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalInListNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub list: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(bool, tag="3")]
+ #[prost(bool, tag = "3")]
pub negated: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalCaseNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub when_then_expr: ::prost::alloc::vec::Vec<PhysicalWhenThen>,
- #[prost(message, optional, boxed, tag="3")]
+ #[prost(message, optional, boxed, tag = "3")]
pub else_expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalScalarFunctionNode {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
- #[prost(enumeration="::datafusion_proto::protobuf::ScalarFunction",
tag="2")]
+ #[prost(enumeration = "::datafusion_proto::protobuf::ScalarFunction", tag
= "2")]
pub fun: i32,
- #[prost(message, repeated, tag="3")]
+ #[prost(message, repeated, tag = "3")]
pub args: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(message, optional, tag="4")]
+ #[prost(message, optional, tag = "4")]
pub return_type:
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalTryCastNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub arrow_type:
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalCastNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub arrow_type:
::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalNegativeNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnresolvedShuffleExecNode {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub stage_id: u32,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub input_partition_count: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub output_partition_count: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilterExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub expr: ::core::option::Option<PhysicalExprNode>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileGroup {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub files: ::prost::alloc::vec::Vec<PartitionedFile>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ScanLimit {
/// wrap into a message to make it optional
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub limit: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileScanExecConf {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub file_groups: ::prost::alloc::vec::Vec<FileGroup>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
- #[prost(uint32, repeated, tag="4")]
+ #[prost(uint32, repeated, tag = "4")]
pub projection: ::prost::alloc::vec::Vec<u32>,
- #[prost(message, optional, tag="5")]
+ #[prost(message, optional, tag = "5")]
pub limit: ::core::option::Option<ScanLimit>,
- #[prost(message, optional, tag="6")]
+ #[prost(message, optional, tag = "6")]
pub statistics: ::core::option::Option<Statistics>,
- #[prost(string, repeated, tag="7")]
+ #[prost(string, repeated, tag = "7")]
pub table_partition_cols:
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
- #[prost(string, tag="8")]
+ #[prost(string, tag = "8")]
pub object_store_url: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ParquetScanExecNode {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub base_conf: ::core::option::Option<FileScanExecConf>,
- #[prost(message, optional, tag="2")]
- pub pruning_predicate:
::core::option::Option<::datafusion_proto::protobuf::LogicalExprNode>,
+ #[prost(message, optional, tag = "2")]
+ pub pruning_predicate: ::core::option::Option<
+ ::datafusion_proto::protobuf::LogicalExprNode,
+ >,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CsvScanExecNode {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub base_conf: ::core::option::Option<FileScanExecConf>,
- #[prost(bool, tag="2")]
+ #[prost(bool, tag = "2")]
pub has_header: bool,
- #[prost(string, tag="3")]
+ #[prost(string, tag = "3")]
pub delimiter: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AvroScanExecNode {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub base_conf: ::core::option::Option<FileScanExecConf>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HashJoinExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub left:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, optional, boxed, tag="2")]
+ #[prost(message, optional, boxed, tag = "2")]
pub right:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, repeated, tag="3")]
+ #[prost(message, repeated, tag = "3")]
pub on: ::prost::alloc::vec::Vec<JoinOn>,
- #[prost(enumeration="::datafusion_proto::protobuf::JoinType", tag="4")]
+ #[prost(enumeration = "::datafusion_proto::protobuf::JoinType", tag = "4")]
pub join_type: i32,
- #[prost(enumeration="PartitionMode", tag="6")]
+ #[prost(enumeration = "PartitionMode", tag = "6")]
pub partition_mode: i32,
- #[prost(bool, tag="7")]
+ #[prost(bool, tag = "7")]
pub null_equals_null: bool,
- #[prost(message, optional, tag="8")]
+ #[prost(message, optional, tag = "8")]
pub filter: ::core::option::Option<JoinFilter>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnionExecNode {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub inputs: ::prost::alloc::vec::Vec<PhysicalPlanNode>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExplainExecNode {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
- #[prost(message, repeated, tag="2")]
- pub stringified_plans:
::prost::alloc::vec::Vec<::datafusion_proto::protobuf::StringifiedPlan>,
- #[prost(bool, tag="3")]
+ #[prost(message, repeated, tag = "2")]
+ pub stringified_plans: ::prost::alloc::vec::Vec<
+ ::datafusion_proto::protobuf::StringifiedPlan,
+ >,
+ #[prost(bool, tag = "3")]
pub verbose: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CrossJoinExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub left:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, optional, boxed, tag="2")]
+ #[prost(message, optional, boxed, tag = "2")]
pub right:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalColumn {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub index: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JoinOn {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub left: ::core::option::Option<PhysicalColumn>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub right: ::core::option::Option<PhysicalColumn>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EmptyExecNode {
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub produce_one_row: bool,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProjectionExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(string, repeated, tag="3")]
+ #[prost(string, repeated, tag = "3")]
pub expr_name: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct WindowAggExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub window_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(string, repeated, tag="3")]
+ #[prost(string, repeated, tag = "3")]
pub window_expr_name:
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
- #[prost(message, optional, tag="4")]
+ #[prost(message, optional, tag = "4")]
pub input_schema:
::core::option::Option<::datafusion_proto::protobuf::Schema>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AggregateExecNode {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub group_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub aggr_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(enumeration="AggregateMode", tag="3")]
+ #[prost(enumeration = "AggregateMode", tag = "3")]
pub mode: i32,
- #[prost(message, optional, boxed, tag="4")]
+ #[prost(message, optional, boxed, tag = "4")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(string, repeated, tag="5")]
+ #[prost(string, repeated, tag = "5")]
pub group_expr_name:
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
- #[prost(string, repeated, tag="6")]
+ #[prost(string, repeated, tag = "6")]
pub aggr_expr_name:
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// we need the input schema to the partial aggregate to pass to the final
aggregate
- #[prost(message, optional, tag="7")]
+ #[prost(message, optional, tag = "7")]
pub input_schema:
::core::option::Option<::datafusion_proto::protobuf::Schema>,
- #[prost(message, repeated, tag="8")]
+ #[prost(message, repeated, tag = "8")]
pub null_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(bool, repeated, tag="9")]
+ #[prost(bool, repeated, tag = "9")]
pub groups: ::prost::alloc::vec::Vec<bool>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleWriterExecNode {
/// TODO it seems redundant to provide job and stage id here since we also
have them
/// in the TaskDefinition that wraps this plan
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub stage_id: u32,
- #[prost(message, optional, boxed, tag="3")]
+ #[prost(message, optional, boxed, tag = "3")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, optional, tag="4")]
+ #[prost(message, optional, tag = "4")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleReaderExecNode {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub partition: ::prost::alloc::vec::Vec<ShuffleReaderPartition>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleReaderPartition {
/// each partition of a shuffle read can read data from multiple locations
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub location: ::prost::alloc::vec::Vec<PartitionLocation>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GlobalLimitExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
/// The number of rows to skip before fetch
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub skip: u32,
/// Maximum number of rows to fetch; negative means no limit
- #[prost(int64, tag="3")]
+ #[prost(int64, tag = "3")]
pub fetch: i64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LocalLimitExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub fetch: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SortExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
/// Maximum number of highest/lowest rows to fetch; negative means no limit
- #[prost(int64, tag="3")]
+ #[prost(int64, tag = "3")]
pub fetch: i64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SortPreservingMergeExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CoalesceBatchesExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub target_batch_size: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CoalescePartitionsExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalHashRepartition {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub hash_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
pub partition_count: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RepartitionExecNode {
- #[prost(message, optional, boxed, tag="1")]
+ #[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
- #[prost(oneof="repartition_exec_node::PartitionMethod", tags="2, 3, 4")]
+ #[prost(oneof = "repartition_exec_node::PartitionMethod", tags = "2, 3,
4")]
pub partition_method:
::core::option::Option<repartition_exec_node::PartitionMethod>,
}
/// Nested message and enum types in `RepartitionExecNode`.
pub mod repartition_exec_node {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum PartitionMethod {
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
RoundRobin(u64),
- #[prost(message, tag="3")]
+ #[prost(message, tag = "3")]
Hash(super::PhysicalHashRepartition),
- #[prost(uint64, tag="4")]
+ #[prost(uint64, tag = "4")]
Unknown(u64),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JoinFilter {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub expression: ::core::option::Option<PhysicalExprNode>,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub column_indices: ::prost::alloc::vec::Vec<ColumnIndex>,
- #[prost(message, optional, tag="3")]
+ #[prost(message, optional, tag = "3")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ColumnIndex {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub index: u32,
- #[prost(enumeration="JoinSide", tag="2")]
+ #[prost(enumeration = "JoinSide", tag = "2")]
pub side: i32,
}
///
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -581,202 +597,206 @@ pub struct ColumnIndex {
///
/////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutionGraph {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub session_id: ::prost::alloc::string::String,
- #[prost(message, optional, tag="3")]
+ #[prost(message, optional, tag = "3")]
pub status: ::core::option::Option<JobStatus>,
- #[prost(message, repeated, tag="4")]
+ #[prost(message, repeated, tag = "4")]
pub stages: ::prost::alloc::vec::Vec<ExecutionGraphStage>,
- #[prost(uint64, tag="5")]
+ #[prost(uint64, tag = "5")]
pub output_partitions: u64,
- #[prost(message, repeated, tag="6")]
+ #[prost(message, repeated, tag = "6")]
pub output_locations: ::prost::alloc::vec::Vec<PartitionLocation>,
- #[prost(string, tag="7")]
+ #[prost(string, tag = "7")]
pub scheduler_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="8")]
+ #[prost(uint32, tag = "8")]
pub task_id_gen: u32,
- #[prost(message, repeated, tag="9")]
+ #[prost(message, repeated, tag = "9")]
pub failed_attempts: ::prost::alloc::vec::Vec<StageAttempts>,
- #[prost(string, tag="10")]
+ #[prost(string, tag = "10")]
pub job_name: ::prost::alloc::string::String,
- #[prost(uint64, tag="11")]
+ #[prost(uint64, tag = "11")]
pub start_time: u64,
- #[prost(uint64, tag="12")]
+ #[prost(uint64, tag = "12")]
pub end_time: u64,
- #[prost(uint64, tag="13")]
+ #[prost(uint64, tag = "13")]
pub queued_at: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StageAttempts {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub stage_id: u32,
- #[prost(uint32, repeated, tag="2")]
+ #[prost(uint32, repeated, tag = "2")]
pub stage_attempt_num: ::prost::alloc::vec::Vec<u32>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutionGraphStage {
- #[prost(oneof="execution_graph_stage::StageType", tags="1, 2, 3, 4")]
+ #[prost(oneof = "execution_graph_stage::StageType", tags = "1, 2, 3, 4")]
pub stage_type: ::core::option::Option<execution_graph_stage::StageType>,
}
/// Nested message and enum types in `ExecutionGraphStage`.
pub mod execution_graph_stage {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum StageType {
- #[prost(message, tag="1")]
+ #[prost(message, tag = "1")]
UnresolvedStage(super::UnResolvedStage),
- #[prost(message, tag="2")]
+ #[prost(message, tag = "2")]
ResolvedStage(super::ResolvedStage),
- #[prost(message, tag="3")]
+ #[prost(message, tag = "3")]
SuccessfulStage(super::SuccessfulStage),
- #[prost(message, tag="4")]
+ #[prost(message, tag = "4")]
FailedStage(super::FailedStage),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnResolvedStage {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub stage_id: u32,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
- #[prost(uint32, repeated, tag="3")]
+ #[prost(uint32, repeated, tag = "3")]
pub output_links: ::prost::alloc::vec::Vec<u32>,
- #[prost(message, repeated, tag="4")]
+ #[prost(message, repeated, tag = "4")]
pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
- #[prost(bytes="vec", tag="5")]
+ #[prost(bytes = "vec", tag = "5")]
pub plan: ::prost::alloc::vec::Vec<u8>,
- #[prost(uint32, tag="6")]
+ #[prost(uint32, tag = "6")]
pub stage_attempt_num: u32,
- #[prost(string, repeated, tag="7")]
- pub last_attempt_failure_reasons:
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ #[prost(string, repeated, tag = "7")]
+ pub last_attempt_failure_reasons: ::prost::alloc::vec::Vec<
+ ::prost::alloc::string::String,
+ >,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ResolvedStage {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub stage_id: u32,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub partitions: u32,
- #[prost(message, optional, tag="3")]
+ #[prost(message, optional, tag = "3")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
- #[prost(uint32, repeated, tag="4")]
+ #[prost(uint32, repeated, tag = "4")]
pub output_links: ::prost::alloc::vec::Vec<u32>,
- #[prost(message, repeated, tag="5")]
+ #[prost(message, repeated, tag = "5")]
pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
- #[prost(bytes="vec", tag="6")]
+ #[prost(bytes = "vec", tag = "6")]
pub plan: ::prost::alloc::vec::Vec<u8>,
- #[prost(uint32, tag="7")]
+ #[prost(uint32, tag = "7")]
pub stage_attempt_num: u32,
- #[prost(string, repeated, tag="8")]
- pub last_attempt_failure_reasons:
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ #[prost(string, repeated, tag = "8")]
+ pub last_attempt_failure_reasons: ::prost::alloc::vec::Vec<
+ ::prost::alloc::string::String,
+ >,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulStage {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub stage_id: u32,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub partitions: u32,
- #[prost(message, optional, tag="3")]
+ #[prost(message, optional, tag = "3")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
- #[prost(uint32, repeated, tag="4")]
+ #[prost(uint32, repeated, tag = "4")]
pub output_links: ::prost::alloc::vec::Vec<u32>,
- #[prost(message, repeated, tag="5")]
+ #[prost(message, repeated, tag = "5")]
pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
- #[prost(bytes="vec", tag="6")]
+ #[prost(bytes = "vec", tag = "6")]
pub plan: ::prost::alloc::vec::Vec<u8>,
- #[prost(message, repeated, tag="7")]
+ #[prost(message, repeated, tag = "7")]
pub task_infos: ::prost::alloc::vec::Vec<TaskInfo>,
- #[prost(message, repeated, tag="8")]
+ #[prost(message, repeated, tag = "8")]
pub stage_metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
- #[prost(uint32, tag="9")]
+ #[prost(uint32, tag = "9")]
pub stage_attempt_num: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedStage {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub stage_id: u32,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub partitions: u32,
- #[prost(message, optional, tag="3")]
+ #[prost(message, optional, tag = "3")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
- #[prost(uint32, repeated, tag="4")]
+ #[prost(uint32, repeated, tag = "4")]
pub output_links: ::prost::alloc::vec::Vec<u32>,
- #[prost(bytes="vec", tag="5")]
+ #[prost(bytes = "vec", tag = "5")]
pub plan: ::prost::alloc::vec::Vec<u8>,
- #[prost(message, repeated, tag="6")]
+ #[prost(message, repeated, tag = "6")]
pub task_infos: ::prost::alloc::vec::Vec<TaskInfo>,
- #[prost(message, repeated, tag="7")]
+ #[prost(message, repeated, tag = "7")]
pub stage_metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
- #[prost(string, tag="8")]
+ #[prost(string, tag = "8")]
pub error_message: ::prost::alloc::string::String,
- #[prost(uint32, tag="9")]
+ #[prost(uint32, tag = "9")]
pub stage_attempt_num: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskInfo {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub task_id: u32,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub partition_id: u32,
/// Scheduler schedule time
- #[prost(uint64, tag="3")]
+ #[prost(uint64, tag = "3")]
pub scheduled_time: u64,
/// Scheduler launch time
- #[prost(uint64, tag="4")]
+ #[prost(uint64, tag = "4")]
pub launch_time: u64,
/// The time the Executor start to run the task
- #[prost(uint64, tag="5")]
+ #[prost(uint64, tag = "5")]
pub start_exec_time: u64,
/// The time the Executor finish the task
- #[prost(uint64, tag="6")]
+ #[prost(uint64, tag = "6")]
pub end_exec_time: u64,
/// Scheduler side finish time
- #[prost(uint64, tag="7")]
+ #[prost(uint64, tag = "7")]
pub finish_time: u64,
- #[prost(oneof="task_info::Status", tags="8, 9, 10")]
+ #[prost(oneof = "task_info::Status", tags = "8, 9, 10")]
pub status: ::core::option::Option<task_info::Status>,
}
/// Nested message and enum types in `TaskInfo`.
pub mod task_info {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
- #[prost(message, tag="8")]
+ #[prost(message, tag = "8")]
Running(super::RunningTask),
- #[prost(message, tag="9")]
+ #[prost(message, tag = "9")]
Failed(super::FailedTask),
- #[prost(message, tag="10")]
+ #[prost(message, tag = "10")]
Successful(super::SuccessfulTask),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GraphStageInput {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub stage_id: u32,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub partition_locations: ::prost::alloc::vec::Vec<TaskInputPartitions>,
- #[prost(bool, tag="3")]
+ #[prost(bool, tag = "3")]
pub complete: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskInputPartitions {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub partition: u32,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KeyValuePair {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub value: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Action {
/// configuration settings
- #[prost(message, repeated, tag="100")]
+ #[prost(message, repeated, tag = "100")]
pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
- #[prost(oneof="action::ActionType", tags="3")]
+ #[prost(oneof = "action::ActionType", tags = "3")]
pub action_type: ::core::option::Option<action::ActionType>,
}
/// Nested message and enum types in `Action`.
@@ -784,183 +804,183 @@ pub mod action {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ActionType {
/// Fetch a partition from an executor
- #[prost(message, tag="3")]
+ #[prost(message, tag = "3")]
FetchPartition(super::FetchPartition),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutePartition {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub stage_id: u32,
- #[prost(uint32, repeated, tag="3")]
+ #[prost(uint32, repeated, tag = "3")]
pub partition_id: ::prost::alloc::vec::Vec<u32>,
- #[prost(message, optional, tag="4")]
+ #[prost(message, optional, tag = "4")]
pub plan: ::core::option::Option<PhysicalPlanNode>,
/// The task could need to read partitions from other executors
- #[prost(message, repeated, tag="5")]
+ #[prost(message, repeated, tag = "5")]
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
/// Output partition for shuffle writer
- #[prost(message, optional, tag="6")]
+ #[prost(message, optional, tag = "6")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchPartition {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub stage_id: u32,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub partition_id: u32,
- #[prost(string, tag="4")]
+ #[prost(string, tag = "4")]
pub path: ::prost::alloc::string::String,
- #[prost(string, tag="5")]
+ #[prost(string, tag = "5")]
pub host: ::prost::alloc::string::String,
- #[prost(uint32, tag="6")]
+ #[prost(uint32, tag = "6")]
pub port: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionLocation {
/// partition_id of the map stage who produces the shuffle.
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub map_partition_id: u32,
/// partition_id of the shuffle, a composition of(job_id + map_stage_id +
partition_id).
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub partition_id: ::core::option::Option<PartitionId>,
- #[prost(message, optional, tag="3")]
+ #[prost(message, optional, tag = "3")]
pub executor_meta: ::core::option::Option<ExecutorMetadata>,
- #[prost(message, optional, tag="4")]
+ #[prost(message, optional, tag = "4")]
pub partition_stats: ::core::option::Option<PartitionStats>,
- #[prost(string, tag="5")]
+ #[prost(string, tag = "5")]
pub path: ::prost::alloc::string::String,
}
/// Unique identifier for a materialized partition of data
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionId {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub stage_id: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub partition_id: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskId {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub task_id: u32,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub task_attempt_num: u32,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub partition_id: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionStats {
- #[prost(int64, tag="1")]
+ #[prost(int64, tag = "1")]
pub num_rows: i64,
- #[prost(int64, tag="2")]
+ #[prost(int64, tag = "2")]
pub num_batches: i64,
- #[prost(int64, tag="3")]
+ #[prost(int64, tag = "3")]
pub num_bytes: i64,
- #[prost(message, repeated, tag="4")]
+ #[prost(message, repeated, tag = "4")]
pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ColumnStats {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub min_value:
::core::option::Option<::datafusion_proto::protobuf::ScalarValue>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub max_value:
::core::option::Option<::datafusion_proto::protobuf::ScalarValue>,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub null_count: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub distinct_count: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OperatorMetricsSet {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub metrics: ::prost::alloc::vec::Vec<OperatorMetric>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedCount {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
pub value: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedGauge {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
pub value: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedTime {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
pub value: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OperatorMetric {
- #[prost(oneof="operator_metric::Metric", tags="1, 2, 3, 4, 5, 6, 7, 8, 9,
10")]
+ #[prost(oneof = "operator_metric::Metric", tags = "1, 2, 3, 4, 5, 6, 7, 8,
9, 10")]
pub metric: ::core::option::Option<operator_metric::Metric>,
}
/// Nested message and enum types in `OperatorMetric`.
pub mod operator_metric {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Metric {
- #[prost(uint64, tag="1")]
+ #[prost(uint64, tag = "1")]
OutputRows(u64),
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
ElapseTime(u64),
- #[prost(uint64, tag="3")]
+ #[prost(uint64, tag = "3")]
SpillCount(u64),
- #[prost(uint64, tag="4")]
+ #[prost(uint64, tag = "4")]
SpilledBytes(u64),
- #[prost(uint64, tag="5")]
+ #[prost(uint64, tag = "5")]
CurrentMemoryUsage(u64),
- #[prost(message, tag="6")]
+ #[prost(message, tag = "6")]
Count(super::NamedCount),
- #[prost(message, tag="7")]
+ #[prost(message, tag = "7")]
Gauge(super::NamedGauge),
- #[prost(message, tag="8")]
+ #[prost(message, tag = "8")]
Time(super::NamedTime),
- #[prost(int64, tag="9")]
+ #[prost(int64, tag = "9")]
StartTimestamp(i64),
- #[prost(int64, tag="10")]
+ #[prost(int64, tag = "10")]
EndTimestamp(i64),
}
}
/// Used by scheduler
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorMetadata {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub id: ::prost::alloc::string::String,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub host: ::prost::alloc::string::String,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub port: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub grpc_port: u32,
- #[prost(message, optional, tag="5")]
+ #[prost(message, optional, tag = "5")]
pub specification: ::core::option::Option<ExecutorSpecification>,
}
/// Used by grpc
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorRegistration {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub id: ::prost::alloc::string::String,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub port: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub grpc_port: u32,
- #[prost(message, optional, tag="5")]
+ #[prost(message, optional, tag = "5")]
pub specification: ::core::option::Option<ExecutorSpecification>,
/// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14
(see <https://github.com/tokio-rs/prost/issues/430> and
<https://github.com/tokio-rs/prost/pull/455>)
/// this syntax is ugly but is binary compatible with the "optional"
keyword (see
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
- #[prost(oneof="executor_registration::OptionalHost", tags="2")]
+ #[prost(oneof = "executor_registration::OptionalHost", tags = "2")]
pub optional_host:
::core::option::Option<executor_registration::OptionalHost>,
}
/// Nested message and enum types in `ExecutorRegistration`.
@@ -969,26 +989,26 @@ pub mod executor_registration {
/// this syntax is ugly but is binary compatible with the "optional"
keyword (see
<https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalHost {
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
Host(::prost::alloc::string::String),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorHeartbeat {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
/// Unix epoch-based timestamp in seconds
- #[prost(uint64, tag="2")]
+ #[prost(uint64, tag = "2")]
pub timestamp: u64,
- #[prost(message, repeated, tag="3")]
+ #[prost(message, repeated, tag = "3")]
pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
- #[prost(message, optional, tag="4")]
+ #[prost(message, optional, tag = "4")]
pub status: ::core::option::Option<ExecutorStatus>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorMetric {
/// TODO add more metrics
- #[prost(oneof="executor_metric::Metric", tags="1")]
+ #[prost(oneof = "executor_metric::Metric", tags = "1")]
pub metric: ::core::option::Option<executor_metric::Metric>,
}
/// Nested message and enum types in `ExecutorMetric`.
@@ -996,36 +1016,36 @@ pub mod executor_metric {
/// TODO add more metrics
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Metric {
- #[prost(uint64, tag="1")]
+ #[prost(uint64, tag = "1")]
AvailableMemory(u64),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStatus {
- #[prost(oneof="executor_status::Status", tags="1, 2, 3")]
+ #[prost(oneof = "executor_status::Status", tags = "1, 2, 3")]
pub status: ::core::option::Option<executor_status::Status>,
}
/// Nested message and enum types in `ExecutorStatus`.
pub mod executor_status {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
Active(::prost::alloc::string::String),
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
Dead(::prost::alloc::string::String),
- #[prost(string, tag="3")]
+ #[prost(string, tag = "3")]
Unknown(::prost::alloc::string::String),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorSpecification {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub resources: ::prost::alloc::vec::Vec<ExecutorResource>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorResource {
/// TODO add more resources
- #[prost(oneof="executor_resource::Resource", tags="1")]
+ #[prost(oneof = "executor_resource::Resource", tags = "1")]
pub resource: ::core::option::Option<executor_resource::Resource>,
}
/// Nested message and enum types in `ExecutorResource`.
@@ -1033,452 +1053,443 @@ pub mod executor_resource {
/// TODO add more resources
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Resource {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
TaskSlots(u32),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorData {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub resources: ::prost::alloc::vec::Vec<ExecutorResourcePair>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorResourcePair {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub total: ::core::option::Option<ExecutorResource>,
- #[prost(message, optional, tag="2")]
+ #[prost(message, optional, tag = "2")]
pub available: ::core::option::Option<ExecutorResource>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningTask {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedTask {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub error: ::prost::alloc::string::String,
- #[prost(bool, tag="2")]
+ #[prost(bool, tag = "2")]
pub retryable: bool,
/// Whether this task failure should be counted to the maximum number of
times the task is allowed to retry
- #[prost(bool, tag="3")]
+ #[prost(bool, tag = "3")]
pub count_to_failures: bool,
- #[prost(oneof="failed_task::FailedReason", tags="4, 5, 6, 7, 8, 9")]
+ #[prost(oneof = "failed_task::FailedReason", tags = "4, 5, 6, 7, 8, 9")]
pub failed_reason: ::core::option::Option<failed_task::FailedReason>,
}
/// Nested message and enum types in `FailedTask`.
pub mod failed_task {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum FailedReason {
- #[prost(message, tag="4")]
+ #[prost(message, tag = "4")]
ExecutionError(super::ExecutionError),
- #[prost(message, tag="5")]
+ #[prost(message, tag = "5")]
FetchPartitionError(super::FetchPartitionError),
- #[prost(message, tag="6")]
+ #[prost(message, tag = "6")]
IoError(super::IoError),
- #[prost(message, tag="7")]
+ #[prost(message, tag = "7")]
ExecutorLost(super::ExecutorLost),
/// A successful task's result is lost due to executor lost
- #[prost(message, tag="8")]
+ #[prost(message, tag = "8")]
ResultLost(super::ResultLost),
- #[prost(message, tag="9")]
+ #[prost(message, tag = "9")]
TaskKilled(super::TaskKilled),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulTask {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
/// TODO tasks are currently always shuffle writes but this will not
always be the case
/// so we might want to think about some refactoring of the task
definitions
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub partitions: ::prost::alloc::vec::Vec<ShuffleWritePartition>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ExecutionError {
-}
+pub struct ExecutionError {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchPartitionError {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub map_stage_id: u32,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub map_partition_id: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct IoError {
-}
+pub struct IoError {}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ExecutorLost {
-}
+pub struct ExecutorLost {}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ResultLost {
-}
+pub struct ResultLost {}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct TaskKilled {
-}
+pub struct TaskKilled {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleWritePartition {
- #[prost(uint64, tag="1")]
+ #[prost(uint64, tag = "1")]
pub partition_id: u64,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub path: ::prost::alloc::string::String,
- #[prost(uint64, tag="3")]
+ #[prost(uint64, tag = "3")]
pub num_batches: u64,
- #[prost(uint64, tag="4")]
+ #[prost(uint64, tag = "4")]
pub num_rows: u64,
- #[prost(uint64, tag="5")]
+ #[prost(uint64, tag = "5")]
pub num_bytes: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskStatus {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub task_id: u32,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub stage_id: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub stage_attempt_num: u32,
- #[prost(uint32, tag="5")]
+ #[prost(uint32, tag = "5")]
pub partition_id: u32,
- #[prost(uint64, tag="6")]
+ #[prost(uint64, tag = "6")]
pub launch_time: u64,
- #[prost(uint64, tag="7")]
+ #[prost(uint64, tag = "7")]
pub start_exec_time: u64,
- #[prost(uint64, tag="8")]
+ #[prost(uint64, tag = "8")]
pub end_exec_time: u64,
- #[prost(message, repeated, tag="12")]
+ #[prost(message, repeated, tag = "12")]
pub metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
- #[prost(oneof="task_status::Status", tags="9, 10, 11")]
+ #[prost(oneof = "task_status::Status", tags = "9, 10, 11")]
pub status: ::core::option::Option<task_status::Status>,
}
/// Nested message and enum types in `TaskStatus`.
pub mod task_status {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
- #[prost(message, tag="9")]
+ #[prost(message, tag = "9")]
Running(super::RunningTask),
- #[prost(message, tag="10")]
+ #[prost(message, tag = "10")]
Failed(super::FailedTask),
- #[prost(message, tag="11")]
+ #[prost(message, tag = "11")]
Successful(super::SuccessfulTask),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PollWorkParams {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub metadata: ::core::option::Option<ExecutorRegistration>,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub num_free_slots: u32,
/// All tasks must be reported until they reach the failed or completed
state
- #[prost(message, repeated, tag="3")]
+ #[prost(message, repeated, tag = "3")]
pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskDefinition {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub task_id: u32,
- #[prost(uint32, tag="2")]
+ #[prost(uint32, tag = "2")]
pub task_attempt_num: u32,
- #[prost(string, tag="3")]
+ #[prost(string, tag = "3")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub stage_id: u32,
- #[prost(uint32, tag="5")]
+ #[prost(uint32, tag = "5")]
pub stage_attempt_num: u32,
- #[prost(uint32, tag="6")]
+ #[prost(uint32, tag = "6")]
pub partition_id: u32,
- #[prost(bytes="vec", tag="7")]
+ #[prost(bytes = "vec", tag = "7")]
pub plan: ::prost::alloc::vec::Vec<u8>,
/// Output partition for shuffle writer
- #[prost(message, optional, tag="8")]
+ #[prost(message, optional, tag = "8")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
- #[prost(string, tag="9")]
+ #[prost(string, tag = "9")]
pub session_id: ::prost::alloc::string::String,
- #[prost(uint64, tag="10")]
+ #[prost(uint64, tag = "10")]
pub launch_time: u64,
- #[prost(message, repeated, tag="11")]
+ #[prost(message, repeated, tag = "11")]
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
/// A set of tasks in the same stage
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MultiTaskDefinition {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub task_ids: ::prost::alloc::vec::Vec<TaskId>,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub stage_id: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub stage_attempt_num: u32,
- #[prost(bytes="vec", tag="5")]
+ #[prost(bytes = "vec", tag = "5")]
pub plan: ::prost::alloc::vec::Vec<u8>,
/// Output partition for shuffle writer
- #[prost(message, optional, tag="6")]
+ #[prost(message, optional, tag = "6")]
pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
- #[prost(string, tag="7")]
+ #[prost(string, tag = "7")]
pub session_id: ::prost::alloc::string::String,
- #[prost(uint64, tag="8")]
+ #[prost(uint64, tag = "8")]
pub launch_time: u64,
- #[prost(message, repeated, tag="9")]
+ #[prost(message, repeated, tag = "9")]
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SessionSettings {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobSessionConfig {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub session_id: ::prost::alloc::string::String,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PollWorkResult {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RegisterExecutorParams {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub metadata: ::core::option::Option<ExecutorRegistration>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RegisterExecutorResult {
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub success: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeartBeatParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
- #[prost(message, optional, tag="3")]
+ #[prost(message, optional, tag = "3")]
pub status: ::core::option::Option<ExecutorStatus>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeartBeatResult {
/// TODO it's from Spark for BlockManager
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub reregister: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StopExecutorParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
/// stop reason
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub reason: ::prost::alloc::string::String,
/// force to stop the executor immediately
- #[prost(bool, tag="3")]
+ #[prost(bool, tag = "3")]
pub force: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct StopExecutorResult {
-}
+pub struct StopExecutorResult {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStoppedParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
/// stop reason
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub reason: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ExecutorStoppedResult {
-}
+pub struct ExecutorStoppedResult {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateTaskStatusParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
/// All tasks must be reported until they reach the failed or completed
state
- #[prost(message, repeated, tag="2")]
+ #[prost(message, repeated, tag = "2")]
pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateTaskStatusResult {
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub success: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryParams {
- #[prost(message, repeated, tag="4")]
+ #[prost(message, repeated, tag = "4")]
pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
- #[prost(oneof="execute_query_params::Query", tags="1, 2")]
+ #[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
pub query: ::core::option::Option<execute_query_params::Query>,
- #[prost(oneof="execute_query_params::OptionalSessionId", tags="3")]
- pub optional_session_id:
::core::option::Option<execute_query_params::OptionalSessionId>,
+ #[prost(oneof = "execute_query_params::OptionalSessionId", tags = "3")]
+ pub optional_session_id: ::core::option::Option<
+ execute_query_params::OptionalSessionId,
+ >,
}
/// Nested message and enum types in `ExecuteQueryParams`.
pub mod execute_query_params {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Query {
- #[prost(bytes, tag="1")]
+ #[prost(bytes, tag = "1")]
LogicalPlan(::prost::alloc::vec::Vec<u8>),
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
Sql(::prost::alloc::string::String),
}
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalSessionId {
- #[prost(string, tag="3")]
+ #[prost(string, tag = "3")]
SessionId(::prost::alloc::string::String),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteSqlParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub sql: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryResult {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub session_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetJobStatusParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulJob {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct QueuedJob {
-}
+pub struct QueuedJob {}
/// TODO: add progress report
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct RunningJob {
-}
+pub struct RunningJob {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedJob {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub error: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobStatus {
- #[prost(oneof="job_status::Status", tags="1, 2, 3, 4")]
+ #[prost(oneof = "job_status::Status", tags = "1, 2, 3, 4")]
pub status: ::core::option::Option<job_status::Status>,
}
/// Nested message and enum types in `JobStatus`.
pub mod job_status {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
- #[prost(message, tag="1")]
+ #[prost(message, tag = "1")]
Queued(super::QueuedJob),
- #[prost(message, tag="2")]
+ #[prost(message, tag = "2")]
Running(super::RunningJob),
- #[prost(message, tag="3")]
+ #[prost(message, tag = "3")]
Failed(super::FailedJob),
- #[prost(message, tag="4")]
+ #[prost(message, tag = "4")]
Successful(super::SuccessfulJob),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetJobStatusResult {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub status: ::core::option::Option<JobStatus>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub path: ::prost::alloc::string::String,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub file_type: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataResult {
- #[prost(message, optional, tag="1")]
+ #[prost(message, optional, tag = "1")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilePartitionMetadata {
- #[prost(string, repeated, tag="1")]
+ #[prost(string, repeated, tag = "1")]
pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelJobParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelJobResult {
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub cancelled: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CleanJobDataParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct CleanJobDataResult {
-}
+pub struct CleanJobDataResult {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchTaskParams {
/// Allow to launch a task set to an executor at once
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub scheduler_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchMultiTaskParams {
/// Allow to launch a task set to an executor at once
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub multi_tasks: ::prost::alloc::vec::Vec<MultiTaskDefinition>,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub scheduler_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchTaskResult {
/// TODO when part of the task set are scheduled successfully
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub success: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchMultiTaskResult {
/// TODO when part of the task set are scheduled successfully
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub success: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelTasksParams {
- #[prost(message, repeated, tag="1")]
+ #[prost(message, repeated, tag = "1")]
pub task_infos: ::prost::alloc::vec::Vec<RunningTaskInfo>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelTasksResult {
- #[prost(bool, tag="1")]
+ #[prost(bool, tag = "1")]
pub cancelled: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RemoveJobDataParams {
- #[prost(string, tag="1")]
+ #[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct RemoveJobDataResult {
-}
+pub struct RemoveJobDataResult {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningTaskInfo {
- #[prost(uint32, tag="1")]
+ #[prost(uint32, tag = "1")]
pub task_id: u32,
- #[prost(string, tag="2")]
+ #[prost(string, tag = "2")]
pub job_id: ::prost::alloc::string::String,
- #[prost(uint32, tag="3")]
+ #[prost(uint32, tag = "3")]
pub stage_id: u32,
- #[prost(uint32, tag="4")]
+ #[prost(uint32, tag = "4")]
pub partition_id: u32,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
@@ -1486,6 +1497,7 @@ pub struct RunningTaskInfo {
pub enum PartitionMode {
CollectLeft = 0,
Partitioned = 1,
+ Auto = 2,
}
impl PartitionMode {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -1496,6 +1508,7 @@ impl PartitionMode {
match self {
PartitionMode::CollectLeft => "COLLECT_LEFT",
PartitionMode::Partitioned => "PARTITIONED",
+ PartitionMode::Auto => "AUTO",
}
}
}
@@ -1972,7 +1985,7 @@ pub mod executor_grpc_client {
pub mod scheduler_grpc_server {
#![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
use tonic::codegen::*;
- ///Generated trait containing gRPC methods that should be implemented for
use with SchedulerGrpcServer.
+ /// Generated trait containing gRPC methods that should be implemented for
use with SchedulerGrpcServer.
#[async_trait]
pub trait SchedulerGrpc: Send + Sync + 'static {
/// Executors must poll the scheduler for heartbeat and to receive
tasks
@@ -2518,7 +2531,7 @@ pub mod scheduler_grpc_server {
pub mod executor_grpc_server {
#![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
use tonic::codegen::*;
- ///Generated trait containing gRPC methods that should be implemented for
use with ExecutorGrpcServer.
+ /// Generated trait containing gRPC methods that should be implemented for
use with ExecutorGrpcServer.
#[async_trait]
pub trait ExecutorGrpc: Send + Sync + 'static {
async fn launch_task(
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 04043822..b1b2ab38 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -392,8 +392,8 @@ mod tests {
None
}
- fn required_child_distribution(&self) -> Distribution {
- Distribution::SinglePartition
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::SinglePartition]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/ballista/core/src/serde/physical_plan/from_proto.rs
b/ballista/core/src/serde/physical_plan/from_proto.rs
index 693037bd..eb951c76 100644
--- a/ballista/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/core/src/serde/physical_plan/from_proto.rs
@@ -411,6 +411,8 @@ impl TryInto<FileScanConfig> for
&protobuf::FileScanExecConf {
projection,
limit: self.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_cols: vec![],
+ // TODO add ordering info to the ballista proto file
+ output_ordering: None,
})
}
}
diff --git a/ballista/core/src/serde/physical_plan/mod.rs
b/ballista/core/src/serde/physical_plan/mod.rs
index 5860f570..cf7e5497 100644
--- a/ballista/core/src/serde/physical_plan/mod.rs
+++ b/ballista/core/src/serde/physical_plan/mod.rs
@@ -331,6 +331,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
physical_window_expr,
input,
Arc::new((&input_schema).try_into()?),
+ vec![],
+ None,
)?))
}
PhysicalPlanType::Aggregate(hash_agg) => {
@@ -528,6 +530,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
let partition_mode = match partition_mode {
protobuf::PartitionMode::CollectLeft =>
PartitionMode::CollectLeft,
protobuf::PartitionMode::Partitioned =>
PartitionMode::Partitioned,
+ protobuf::PartitionMode::Auto => PartitionMode::Auto,
};
Ok(Arc::new(HashJoinExec::try_new(
left,
@@ -792,7 +795,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
input: Some(Box::new(input)),
skip: limit.skip() as u32,
fetch: match limit.fetch() {
- Some(n) => *n as i64,
+ Some(n) => n as i64,
_ => -1, // no limit
},
},
@@ -867,6 +870,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
let partition_mode = match exec.partition_mode() {
PartitionMode::CollectLeft =>
protobuf::PartitionMode::CollectLeft,
PartitionMode::Partitioned =>
protobuf::PartitionMode::Partitioned,
+ PartitionMode::Auto => protobuf::PartitionMode::Auto,
};
Ok(protobuf::PhysicalPlanNode {
@@ -1272,6 +1276,7 @@ fn decode_scan_config(
projection,
limit: proto.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_cols: vec![],
+ output_ordering: None,
})
}
@@ -1599,6 +1604,7 @@ mod roundtrip_tests {
projection: None,
limit: None,
table_partition_cols: vec![],
+ output_ordering: None,
};
let predicate =
datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
diff --git a/ballista/core/src/serde/physical_plan/to_proto.rs
b/ballista/core/src/serde/physical_plan/to_proto.rs
index fbf8e5e7..c303ba45 100644
--- a/ballista/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/core/src/serde/physical_plan/to_proto.rs
@@ -457,7 +457,11 @@ impl TryFrom<&FileScanConfig> for
protobuf::FileScanExecConf {
.map(|n| *n as u32)
.collect(),
schema: Some(conf.file_schema.as_ref().try_into()?),
- table_partition_cols: conf.table_partition_cols.to_vec(),
+ table_partition_cols: conf
+ .table_partition_cols
+ .iter()
+ .map(|col| col.0.to_owned())
+ .collect(),
object_store_url: conf.object_store_url.to_string(),
})
}
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index bde421a8..d43a95b2 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -35,15 +35,15 @@ default = ["mimalloc"]
[dependencies]
anyhow = "1"
-arrow = { version = "26.0.0" }
-arrow-flight = { version = "26.0.0" }
+arrow = { version = "28.0.0" }
+arrow-flight = { version = "28.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 69242f53..885dd2ff 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -39,7 +39,7 @@ sled = ["sled_package", "tokio-stream"]
[dependencies]
anyhow = "1"
-arrow-flight = { version = "26.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "28.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
@@ -47,8 +47,8 @@ base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
etcd-client = { version = "0.10", optional = true }
flatbuffers = { version = "22.9.29" }
futures = "0.3"
diff --git a/ballista/scheduler/src/display.rs
b/ballista/scheduler/src/display.rs
index 7df23c22..6f1de120 100644
--- a/ballista/scheduler/src/display.rs
+++ b/ballista/scheduler/src/display.rs
@@ -131,7 +131,7 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b>
{
plan.fmt_as(self.t, self.f)?;
if let Some(metrics) = self.metrics.get(self.metric_index) {
let metrics = metrics
- .aggregate_by_partition()
+ .aggregate_by_name()
.sorted_for_display()
.timestamps_removed();
write!(self.f, ", metrics=[{}]", metrics)?;
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index e61eb93c..979b65f7 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -44,6 +44,7 @@ use std::ops::Deref;
use std::sync::Arc;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use datafusion::prelude::SessionConfig;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
@@ -292,9 +293,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
// TODO shouldn't this take a ListingOption object as input?
let GetFileMetadataParams { path, file_type } = request.into_inner();
-
+ // Here, we use the default config, since we don't know the session id
+ let config = SessionConfig::default().config_options();
let file_format: Arc<dyn FileFormat> = match file_type.as_str() {
- "parquet" => Ok(Arc::new(ParquetFormat::default())),
+ "parquet" => Ok(Arc::new(ParquetFormat::new(config))),
// TODO implement for CSV
_ => Err(tonic::Status::unimplemented(
"get_file_metadata unsupported file type",
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 51a6232f..6f9b6545 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1691,13 +1691,15 @@ mod test {
let executor2 = mock_executor("executor-id2".to_string());
let mut join_graph = test_join_plan(4).await;
- assert_eq!(join_graph.stage_count(), 5);
+ // With the improvement of
https://github.com/apache/arrow-datafusion/pull/4122,
+ // unnecessary RepartitionExec can be removed
+ assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.available_tasks(), 0);
// Call revive to move the two leaf Resolved stages to Running
join_graph.revive();
- assert_eq!(join_graph.stage_count(), 5);
+ assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.available_tasks(), 2);
// Complete the first stage
@@ -1742,13 +1744,13 @@ mod test {
let executor2 = mock_executor("executor-id2".to_string());
let mut join_graph = test_join_plan(4).await;
- assert_eq!(join_graph.stage_count(), 5);
+ assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.available_tasks(), 0);
// Call revive to move the two leaf Resolved stages to Running
join_graph.revive();
- assert_eq!(join_graph.stage_count(), 5);
+ assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.available_tasks(), 2);
// Complete the first stage
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index c46c0a91..b187d8e9 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,7 +22,7 @@ use std::iter::FromIterator;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
-use
datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
+use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
@@ -363,7 +363,7 @@ impl UnresolvedStage {
)?;
// Optimize join order based on new resolved statistics
- let optimize_join = HashBuildProbeOrder::new();
+ let optimize_join = JoinSelection::new();
let plan = optimize_join.optimize(plan, &SessionConfig::new())?;
Ok(ResolvedStage::new(
@@ -829,7 +829,7 @@ impl RunningStage {
let new_metric = Arc::new(Metric::new(metric_value,
Some(partition)));
first.push(new_metric);
}
- first.aggregate_by_partition()
+ first.aggregate_by_name()
}
pub(super) fn task_failure_number(&self, partition_id: usize) -> usize {
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 708b5077..6e65612f 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -473,10 +473,10 @@ filter_expr="]
subgraph cluster4 {
label = "Stage 5 [Unresolved]";
stage_5_0 [shape=box, label="ShuffleWriter [48 partitions]"]
- stage_5_0_0 [shape=box, label="Projection: a@0, a@1, a@2"]
+ stage_5_0_0 [shape=box, label="Projection: a@0, b@1, a@2, b@3,
a@4, b@5"]
stage_5_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
stage_5_0_0_0_0 [shape=box, label="HashJoin
-join_expr=a@1 = a@0
+join_expr=b@3 = b@1
filter_expr="]
stage_5_0_0_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
stage_5_0_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec
[stage_id=3]"]
@@ -528,7 +528,132 @@ filter_expr="]
Ok(())
}
+ #[tokio::test]
+ async fn dot_optimized() -> Result<()> {
+ let graph = test_graph_optimized().await?;
+ let dot = ExecutionGraphDot::generate(Arc::new(graph))
+ .map_err(|e| BallistaError::Internal(format!("{:?}", e)))?;
+
+ let expected = r#"digraph G {
+ subgraph cluster0 {
+ label = "Stage 1 [Resolved]";
+ stage_1_0 [shape=box, label="ShuffleWriter [0 partitions]"]
+ stage_1_0_0 [shape=box, label="MemoryExec"]
+ stage_1_0_0 -> stage_1_0
+ }
+ subgraph cluster1 {
+ label = "Stage 2 [Resolved]";
+ stage_2_0 [shape=box, label="ShuffleWriter [0 partitions]"]
+ stage_2_0_0 [shape=box, label="MemoryExec"]
+ stage_2_0_0 -> stage_2_0
+ }
+ subgraph cluster2 {
+ label = "Stage 3 [Resolved]";
+ stage_3_0 [shape=box, label="ShuffleWriter [0 partitions]"]
+ stage_3_0_0 [shape=box, label="MemoryExec"]
+ stage_3_0_0 -> stage_3_0
+ }
+ subgraph cluster3 {
+ label = "Stage 4 [Unresolved]";
+ stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"]
+ stage_4_0_0 [shape=box, label="Projection: a@0, a@1, a@2"]
+ stage_4_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@1 = a@0
+filter_expr="]
+ stage_4_0_0_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@0 = a@0
+filter_expr="]
+ stage_4_0_0_0_0_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_0_0_0_0 [shape=box,
label="UnresolvedShuffleExec [stage_id=1]"]
+ stage_4_0_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0_0
+ stage_4_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0
+ stage_4_0_0_0_0_0_0_1 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_0_0_1_0 [shape=box,
label="UnresolvedShuffleExec [stage_id=2]"]
+ stage_4_0_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_0_1
+ stage_4_0_0_0_0_0_0_1 -> stage_4_0_0_0_0_0_0
+ stage_4_0_0_0_0_0_0 -> stage_4_0_0_0_0_0
+ stage_4_0_0_0_0_0 -> stage_4_0_0_0_0
+ stage_4_0_0_0_0_1 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec
[stage_id=3]"]
+ stage_4_0_0_0_0_1_0 -> stage_4_0_0_0_0_1
+ stage_4_0_0_0_0_1 -> stage_4_0_0_0_0
+ stage_4_0_0_0_0 -> stage_4_0_0_0
+ stage_4_0_0_0 -> stage_4_0_0
+ stage_4_0_0 -> stage_4_0
+ }
+ stage_1_0 -> stage_4_0_0_0_0_0_0_0_0
+ stage_2_0 -> stage_4_0_0_0_0_0_0_1_0
+ stage_3_0 -> stage_4_0_0_0_0_1_0
+}
+"#;
+ assert_eq!(expected, &dot);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn query_stage_optimized() -> Result<()> {
+ let graph = test_graph_optimized().await?;
+ let dot = ExecutionGraphDot::generate_for_query_stage(Arc::new(graph),
4)
+ .map_err(|e| BallistaError::Internal(format!("{:?}", e)))?;
+
+ let expected = r#"digraph G {
+ stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"]
+ stage_4_0_0 [shape=box, label="Projection: a@0, a@1, a@2"]
+ stage_4_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@1 = a@0
+filter_expr="]
+ stage_4_0_0_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_0_0 [shape=box, label="HashJoin
+join_expr=a@0 = a@0
+filter_expr="]
+ stage_4_0_0_0_0_0_0_0 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_0_0_0_0 [shape=box,
label="UnresolvedShuffleExec [stage_id=1]"]
+ stage_4_0_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0_0
+ stage_4_0_0_0_0_0_0_0 -> stage_4_0_0_0_0_0_0
+ stage_4_0_0_0_0_0_0_1 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_0_0_1_0 [shape=box,
label="UnresolvedShuffleExec [stage_id=2]"]
+ stage_4_0_0_0_0_0_0_1_0 -> stage_4_0_0_0_0_0_0_1
+ stage_4_0_0_0_0_0_0_1 -> stage_4_0_0_0_0_0_0
+ stage_4_0_0_0_0_0_0 -> stage_4_0_0_0_0_0
+ stage_4_0_0_0_0_0 -> stage_4_0_0_0_0
+ stage_4_0_0_0_0_1 [shape=box, label="CoalesceBatches
[batchSize=4096]"]
+ stage_4_0_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec
[stage_id=3]"]
+ stage_4_0_0_0_0_1_0 -> stage_4_0_0_0_0_1
+ stage_4_0_0_0_0_1 -> stage_4_0_0_0_0
+ stage_4_0_0_0_0 -> stage_4_0_0_0
+ stage_4_0_0_0 -> stage_4_0_0
+ stage_4_0_0 -> stage_4_0
+}
+"#;
+ assert_eq!(expected, &dot);
+ Ok(())
+ }
+
async fn test_graph() -> Result<ExecutionGraph> {
+ let ctx =
+
SessionContext::with_config(SessionConfig::new().with_target_partitions(48));
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::UInt32, false),
+ ]));
+ let table = Arc::new(MemTable::try_new(schema.clone(), vec![])?);
+ ctx.register_table("foo", table.clone())?;
+ ctx.register_table("bar", table.clone())?;
+ ctx.register_table("baz", table)?;
+ let df = ctx
+ .sql("SELECT * FROM foo JOIN bar ON foo.a = bar.a JOIN baz on
bar.b = baz.b")
+ .await?;
+ let plan = df.to_logical_plan()?;
+ let plan = ctx.create_physical_plan(&plan).await?;
+ ExecutionGraph::new("scheduler_id", "job_id", "job_name",
"session_id", plan, 0)
+ }
+
+ // With the improvement of
https://github.com/apache/arrow-datafusion/pull/4122,
+ // Redundant RepartitionExec can be removed so that the stage number will
be reduced
+ async fn test_graph_optimized() -> Result<ExecutionGraph> {
let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(48));
let schema =
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index b4526fd1..ae386382 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -81,7 +81,7 @@ impl TableProvider for ExplodingTableProvider {
async fn scan(
&self,
_ctx: &SessionState,
- _projection: &Option<Vec<usize>>,
+ _projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 5752359a..8754f192 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "14.0.0"
-datafusion-proto = "14.0.0"
+datafusion = "15.0.0"
+datafusion-proto = "15.0.0"
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index fd18a48f..10ff91ad 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -816,7 +816,8 @@ async fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
- let format =
ParquetFormat::default().with_enable_pruning(true);
+ let format = ParquetFormat::new(ctx.config_options())
+ .with_enable_pruning(Some(true));
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
@@ -832,6 +833,7 @@ async fn get_table(
target_partitions,
collect_stat: true,
table_partition_cols: vec![],
+ file_sort_order: None,
};
let url = ListingTableUrl::parse(path)?;
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 7d24401c..06aa536d 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "14.0.0"
+datafusion = "15.0.0"
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 2f02f175..5879d744 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
[dependencies]
async-trait = "0.1"
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = { version = "14.0.0", features = ["pyarrow"] }
+datafusion = { version = "15.0.0", features = ["pyarrow"] }
futures = "0.3"
mimalloc = { version = "*", optional = true, default-features = false }
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3",
"abi3-py37"] }
diff --git a/python/src/context.rs b/python/src/context.rs
index 38a3cb9b..26c5661a 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -23,7 +23,7 @@ use uuid::Uuid;
use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;
-use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
@@ -34,6 +34,7 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
+use crate::datatype::PyDataType;
use crate::errors::DataFusionError;
use crate::udf::PyScalarUDF;
use crate::utils::wait_for_future;
@@ -159,13 +160,13 @@ impl PySessionContext {
&mut self,
name: &str,
path: &str,
- table_partition_cols: Vec<String>,
+ table_partition_cols: Vec<(String, PyDataType)>,
parquet_pruning: bool,
file_extension: &str,
py: Python,
) -> PyResult<()> {
let mut options = ParquetReadOptions::default()
- .table_partition_cols(table_partition_cols)
+
.table_partition_cols(convert_table_partition_cols(table_partition_cols))
.parquet_pruning(parquet_pruning);
options.file_extension = file_extension;
let result = self.ctx.register_parquet(name, path, options);
@@ -255,3 +256,12 @@ impl PySessionContext {
Ok(PyDataFrame::new(self.ctx.read_empty()?))
}
}
+
+fn convert_table_partition_cols(
+ table_partition_cols: Vec<(String, PyDataType)>,
+) -> Vec<(String, DataType)> {
+ table_partition_cols
+ .iter()
+ .map(|(name, t)| (name.clone(), t.data_type.clone()))
+ .collect()
+}
\ No newline at end of file
diff --git a/python/src/dataset.rs b/python/src/dataset.rs
index d34d974f..f0b2b10e 100644
--- a/python/src/dataset.rs
+++ b/python/src/dataset.rs
@@ -98,7 +98,7 @@ impl TableProvider for Dataset {
async fn scan(
&self,
_ctx: &SessionState,
- projection: &Option<Vec<usize>>,
+ projection: Option<&Vec<usize>>,
filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.
@@ -111,7 +111,7 @@ impl TableProvider for Dataset {
DatasetExec::new(
py,
self.dataset.as_ref(py),
- projection.clone(),
+ projection.cloned(),
filters,
)
.map_err(|err| DataFusionError::External(Box::new(err)))?,
diff --git a/python/src/datatype.rs b/python/src/datatype.rs
new file mode 100644
index 00000000..07f6cd23
--- /dev/null
+++ b/python/src/datatype.rs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Copied from https://github.com/apache/arrow-datafusion-python/pull/103
+
+use datafusion::arrow::datatypes::DataType;
+use pyo3::pyclass;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+#[pyclass(name = "PyDataType", module = "datafusion", subclass)]
+pub struct PyDataType {
+ pub(crate) data_type: DataType,
+}
+
+impl From<PyDataType> for DataType {
+ fn from(data_type: PyDataType) -> DataType {
+ data_type.data_type
+ }
+}
+
+impl From<DataType> for PyDataType {
+ fn from(data_type: DataType) -> PyDataType {
+ PyDataType { data_type }
+ }
+}
\ No newline at end of file
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 01158d55..106978a7 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -29,6 +29,8 @@ mod context;
mod dataframe;
mod dataset;
mod dataset_exec;
+#[allow(clippy::borrow_deref_ref)]
+mod datatype;
pub mod errors;
#[allow(clippy::borrow_deref_ref)]
mod expression;
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index f2973476..42c388ed 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -95,6 +95,10 @@ impl Accumulator for RustAccumulator {
Ok(())
})
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
}
pub fn to_rust_accumulator(accum: PyObject) ->
AccumulatorFunctionImplementation {